Comparing the performance of RoR and Go workers in the background processing. Part 2 — Pure Ruby and Go workers

class RubyCalculatorNoAr
include Sidekiq::Worker

def perform(task_id)
# Task processing time start
time = Time.now
rows = get_page_for_task(task_id)
return if rows.count.zero?
total_avg = page_avg(rows)
create_survey_result(task_id, total_avg) # Task processing duration
duration = Time.now - time

Statistic.create(task_id: task_id, handler_type: 'ruby no AR', duration: duration, collection_size: rows.count)
end
def page_avg(rows)
min = max = sum = 0
rows.each do |value|
max = value if value > max
min = value if value < min
sum += value
end
avg = sum / rows.count
(min + max + avg) / 3
end
def get_page_for_task(task_id)
task_sql = <<-SQL
SELECT tasks.page, tasks.per_page FROM tasks WHERE tasks.id = #{task_id} LIMIT 1
SQL
task_row = ActiveRecord::Base.connection.select_one(task_sql)
page = task_row['page']
per_page = task_row['per_page']
offset = per_page * ((page = page.to_i - 1) < 0 ? 0 : page)
page_sql = <<-SQL
SELECT survey_results.value FROM survey_results LIMIT # {per_page} OFFSET #{offset}
SQL

ActiveRecord::Base.connection.select_values(page_sql)
end
def create_survey_result(task_id, value)
survey_result_sql = <<-SQL
INSERT INTO survey_results (task_id, value) VALUES (#{task_id}, #{value})
SQL
ActiveRecord::Base.connection.insert(survey_result_sql)
end
end

def process_in_background
RubyCalculator.perform_async(id)
RubyCalculatorNoAr.perform_async(id)
end
mkdir $GOPATH/src/golang_survey_calculator
cd $GOPATH/src/golang_survey_calulator
touch .gitignore
POSTGRES_USERNAME=postgres
POSTGRES_PASSWORD=better_generate_your_password
POSTGRES_HOST=localhost
POSTGRES_DATABASE=survey_calculator_development
POSTGRES_PORT=5432
REDIS_URL=redis://localhost:6379/0
.env
*.bin
...privatedef process_in_background
RubyCalculator.perform_async(id)
RubyCalculatorNoAr.perform_async(id)
Sidekiq::Client.push(‘class’ => ‘GoSurveyCalculator’, ‘args’ => [id], ‘queue’ => ‘go_survey_calculator’)
end
...
package main
import (
“fmt”
_ “github.com/joho/godotenv/autoload”
“github.com/benmanns/goworker”
)
func init() {
registerWorker()
}
func main() {
if err := goworker.Work(); err != nil {
fmt.Println(“Error:”, err)
}
}
package main
import (
“os”
“fmt”
“time”
“github.com/benmanns/goworker”
)
func registerWorker() {
redisUrl := os.Getenv(“REDIS_URL”)
workerSettings := goworker.WorkerSettings{
URI: redisUrl,
Connections: 100,
Queues: []string{“go_survey_calculator”},
Concurrency: 2,
Namespace: “”,
Interval: 5.0,
}
goworker.SetSettings(workerSettings)
goworker.Register(“GoSurveyCalculator”, surveyCalculatorWorker)
}
func surveyCalculatorWorker(queue string, args …interface{}) error {
taskId := int64(args[0].(float64))
fmt.Println(“Recieved task with id “, taskId)
return nil
}
{
“task”: {
“page”: 1,
“per_page”: 1
}
}
package main
import (
"os"
"fmt"
"database/sql"
_ "github.com/lib/pq"
)
var db *sql.DB
var err error
func connectToDb() {
user := os.Getenv("POSTGRES_USERNAME")
password := os.Getenv("POSTGRES_PASSWORD")
dbname := os.Getenv("POSTGRES_DATABASE")
host := os.Getenv("POSTGRES_HOST")
port := os.Getenv("POSTGRES_PORT")
connStr := fmt.Sprintf("user=%s password=%s dbname=%s host=%s port=%s sslmode=disable", user, password, dbname, host, port)
db, err = sql.Open("postgres", connStr)
if err != nil {
fmt.Println(err)
}
if err = db.Ping(); err != nil {
fmt.Println(err)
}
}
package main
import (
“fmt”
_ “github.com/joho/godotenv/autoload”
“github.com/benmanns/goworker”
)
func init() {
connectToDb()
registerWorker()
}
func main() {
if err := goworker.Work(); err != nil {
fmt.Println(“Error:”, err)
}
}
package main
import (
“fmt”
“database/sql”
)
type Task struct {
page int
perPage int
}
func (task *Task)Find(id sql.NullInt64) (*Task) {
row := db.QueryRow(“SELECT page, per_page FROM tasks WHERE id = $1”, id)
t := new(Task)
err := row.Scan(&t.page, &t.perPage)
if err == sql.ErrNoRows {
fmt.Println(“task not found”)
} else if err != nil {
fmt.Println(“Task scan error:”, err)
}
if err != nil {
fmt.Println(err)
}
return t
}
package main
import (
“fmt”
“database/sql”
)
type SurveyResult struct {
value float64
taskId sql.NullInt64
}
func (surveyResult *SurveyResult) Page(page, perPage int) []*SurveyResult {
offset := (page — 1) * perPage
rows, err := db.Query(“SELECT survey_results.value, survey_results.task_id FROM survey_results LIMIT $1 OFFSET $2”, perPage, offset)
if err != nil {
fmt.Println(“DB error: “, err)
}
defer rows.Close() surveyResults := make([]*SurveyResult, 0) for rows.Next() {
sr := new(SurveyResult)
err := rows.Scan(&sr.value, &sr.taskId)
if err != nil {
fmt.Println(“Survey Result Scan Error “, err)
}
surveyResults = append(surveyResults, sr)
}
return surveyResults
}
func (surveyResult *SurveyResult) Save() {
_, err := db.Exec(“INSERT INTO survey_results (value, task_id) VALUES ($1, $2)”, surveyResult.value, surveyResult.taskId)
if err != nil {
fmt.Println(“Result creation error “, err)
}
}
package main
import (
"fmt"
"database/sql"
)
type Statistic struct {
taskId sql.NullInt64
handlerType string
collectionSize int
duration float64
}
func (statistic *Statistic) Save() {
_, err := db.Exec("INSERT INTO statistics (task_id, handler_type, collection_size, duration) VALUES ($1, $2, $3, $4)", statistic.taskId, statistic.handlerType, statistic.collectionSize, statistic.duration)
if err != nil {
fmt.Println("Statistic create error", err)
}
}
package main
import (
"os"
"fmt"
"time"
"database/sql"
"github.com/benmanns/goworker"
)
func registerWorker() {
redisUrl := os.Getenv("REDIS_URL")
workerSettings := goworker.WorkerSettings{
URI: redisUrl,
Connections: 100,
Queues: []string{"go_survey_calculator"},
Concurrency: 2,
Namespace: "",
Interval: 5.0,
}

goworker.SetSettings(workerSettings)
goworker.Register("GoSurveyCalculator", surveyCalculatorWorker)
}
func surveyCalculatorWorker(queue string, args ...interface{}) error {
startTime := time.Now()
taskId := sql.NullInt64{Int64: int64(args[0].(float64)), Valid: true}
task := new(Task).Find(taskId)
surveyResults := new(SurveyResult).Page(task.page, task.perPage)
avg := avgForSurvey(surveyResults)
// create new survey result
newResult := new(SurveyResult)
newResult.value = avg
newResult.taskId = taskId
newResult.Save()
endTime := time.Now()
duration := endTime.Sub(startTime)

// write statistic to db
statistic := new(Statistic)
statistic.duration = duration.Seconds()
statistic.taskId = taskId
statistic.handlerType = "go_worker"
statistic.collectionSize = task.perPage
statistic.Save() fmt.Println("Completed task", taskId.Int64)
return nil
}
func avgForSurvey(surveyResults []*SurveyResult) (float64) {
min, max, sum := 0.0, 0.0, 0.0
for _, element := range surveyResults {
if element.value < min {
min = element.value
}
if element.value > max {
max = element.value
}
sum = sum + element.value
}
avg := sum / float64(len(surveyResults)) return (min + max + avg) / 3
}
{
“task”: {
“page”: 1,
“per_page”: 1
}
}
{
“task”: {
“page”: 1,
“per_page”: 1
}
}
[
{
"handler_type": "go_worker",
"collection_size": 1,
"duration": 0.012798721
},
{
"handler_type": "ruby no AR",
"collection_size": 1,
"duration": 0.02012
},
{
"handler_type": "ruby",
"collection_size": 1,
"duration": 0.023259
}
]
{
“task”: {
“page”: 1,
“per_page”: 10
}
}
[
{
“handler_type”: “go_worker”,
“collection_size”: 10,
“duration”: 0.010675476
},
{
“handler_type”: “ruby no AR”,
“collection_size”: 10,
“duration”: 0.019779
},
{
“handler_type”: “ruby”,
“collection_size”: 10,
“duration”: 0.020584
}
]

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade