New ground — Automatic increase of Kafka LVM on GCP

Sorin Tudor
METRO SYSTEMS Romania
5 min readNov 27, 2019

--

When you start offering Kafka as a service to your clients, there is no longer the issue that you need to control the amount of partitions and topics that are created. Space becomes the only thing that you need to care for.

Cloud deployment is powerful, you can order as many resource as you like or need. There are no longer the limitations of resources and time, you just need to know the amount and the server where you need it. In that purpose, we were forced to think of a solution that can offer the flexibility of increase and decrease of storage.

Let’s assume the following configuration of Kafka servers.

We want to add or remove storage on each node without service disruption and in a granular manner.

Since decrease involves more risks(there are more steps to be done, including filesystem resize and movement from one disk to another) and it’s not done as often as increase, it will be treated separately and done most likely in a manual procedure.

As for the increase, it should be easy. If you have a service account on the compute engine (follow the link for more details https://cloud.google.com/compute/docs/access/service-accounts) and sufficient rights (storage admin role with right of adding it to the VM), there rest is pretty straight forward.

Google has a variety of supported languages for the client library and for convenience we will use Python (google-api-python-client)

By default, we are building our instances with at least one data drive that is mounted on the machine and used for LVM with naming[instance-name]data. The common agreement was that the additional disks should be incremented with a number, like [instance-name]data-1

In order to know what disks are already configured in the cloud and on the machine, we need to query them and return in two separate lists:

def query_disks(instance_name):
service = discovery.build('compute', 'v1', credentials = credentials)
request = service.disks().list(project = project, zone = zone)
disk_list = []
instance_disk_list = []
unmounted_disk_list = []
while request is not None:
response = request.execute()
for disk in response['items']:
disk_list.append(disk)
request = service.disks().list_next(
previous_request = request, previous_response = response)
for i in range(len(disk_list)):
if (
'users' in disk_list[i].keys()) and(
'https://www.googleapis.com/compute/v1/projects/' +
project +
'/zones/' +
zone +
'/instances/' +
instance_name in disk_list[i]['users']):
instance_disk_list.append(disk_list[i]['selfLink'])
else :
unmounted_disk_list.append(disk_list[i]['selfLink'])
return instance_disk_list, unmounted_disk_list

The reason why we are doing that is to cover the cases in which one disk was already ordered but does not have a machine associated. The order of the naming is constructed from all the namings ordered in GCP, not only the ones that are already attached to our machines.

Authentication is constructed from a JSON file that you download directly from GCP IAM project area and the code is straightforward (in order to work you need a user with following permissions):

credentials = service_account.Credentials.from_service_account_file(
‘account.decrypted’)

Sure, we are concerned about security, so we store it encrypted on sight and decrypt it by a specific key.

I will not add also the functions that take care or ordering and attaching disks because they are simple and very close to the examples available in the GCP documentation. You can also write them to suite you requirements.

Of more interest should be the logic of configuring the actual host which is specified in the following function:

def configure_gcp(host):
gcp_name = construct_gcp_name(host)
instance_disk_list, unmounted_disk_list = query_disks(gcp_name)
if instance_disk_list[-1] == 'https://www.googleapis.com/compute/v1/projects/' + \
project + '/zones/' + zone + '/disks/' + gcp_name + 'data':
new_disk_name = 'https://www.googleapis.com/compute/v1/projects/' + \
project + '/zones/' + zone + '/disks/' + gcp_name + 'data-1'
else :
temp_disk_name = instance_disk_list[-1].split("-")
temp_disk_name[-1] = str(int(temp_disk_name[-1]) + 1)
separator = "-"
new_disk_name = separator.join(temp_disk_name)
print(new_disk_name)
disk_to_be_ordered = new_disk_name.split("/")
if new_disk_name not in unmounted_disk_list:
order_disk(disk_to_be_ordered[-1], "10")
time.sleep(120)
attach_disk(gcp_name, new_disk_name)
else :
attach_disk(gcp_name, new_disk_name)

Once the disks are successfully attached to machine, a small sh script is called on each machine from Python, so that newly available disks are converted to physical volumes, added to the volume group and used for the extension of required logical volume.

#!/bin/bash. ~/.profile
### Usefull variables ###
vg="$(/sbin/vgdisplay | /bin/grep "VG Name" | /usr/bin/awk '{print $3}' )"
mapper="$(/sbin/fdisk -l | /bin/grep mapper | /usr/bin/awk '{print $2}' | /bin/sed -e 's/://g' | /bin/grep -v swap)"
lvext="$(/sbin/fdisk -l | /bin/grep mapper | /usr/bin/awk '{print $2}' | /bin/grep -v swap | /bin/sed -e 's/://g')"
for i in `lsblk -S | sort -h | awk '{print $1}' | grep -v sda | grep -v sr | grep -v swap | grep -v NAME`
do
/sbin/pvdisplay /dev/$i
if [ $? -eq 0 ]; then

/bin/echo -e "\e[92m Nothing to do \e[0m"
### Increase the LV
else
/bin/echo " "
/bin/echo -e "There is a new disk: \e[92m $i \e[0m"
/bin/echo " "
/usr/bin/timeout 3 sleep 10
/bin/echo -e "\e[92m Current disk space: `df -h | grep mapper | awk '{print $2}'` \e[0m"
/bin/echo " "
/usr/bin/timeout 3 sleep 10
/bin/echo -e "\e[92m The disk will be increased \e[0m "
/bin/echo ""
/usr/bin/timeout 3 sleep 10
/bin/echo -e "\e[92m Create pv \e[0m"
/usr/bin/timeout 3 sleep 10
/sbin/pvcreate /dev/$i
/bin/echo " "
/bin/echo -e "\e[92m Extend VG \e[0m"
/usr/bin/timeout 3 sleep 10
/sbin/vgextend $vg /dev/$i
/bin/echo " "
/bin/echo -e "\e[92m extend LV \e[0m"
/usr/bin/timeout 3 sleep 10
/sbin/lvextend -l+"100"%FREE $lvext
/bin/echo " "
/bin/echo -e "\e[92m Resize2fs \e[0m"
/usr/bin/timeout 3 sleep 10
/sbin/resize2fs $mapper
/bin/echo " "
/bin/echo " "
/usr/bin/timeout 3 sleep 10
/bin/echo -e "\e[92m #### Successfully resized #### \e[0m"
/bin/echo " "
/bin/echo " "
/usr/bin/timeout 3 sleep 10
/bin/echo -e "\e[92m New disk space: `df -h | grep mapper | awk '{print $2}'` \e[0m"
/bin/echo " "
fi
done

Once all this is done, you are good to go, the only thing that remains to be done is to distribute the files on all machines and handle the dependences.

We will do that using Puppet( https://puppet.com/) manifest from below:

class profiles_kafka::fs_increase {

$account_key = hiera('profiles_kafka::fs_increase::key','password')


package {'python3-pip':
ensure => present,
}

package {'google-api-python-client':
ensure => present,
provider => pip3,
}

file { 'fs_increase.py':
path => '/root/fs_increase.py',
ensure => 'present',
mode => '0500',
owner => 'root',
group => 'root',
content => template("${module_name}/fs_increase.py.erb"),
replace => 'true',
}

file { 'add_disk.sh':
path => '/root/add_disk.sh',
ensure => 'present',
mode => '0500',
owner => 'root',
group => 'root',
source => 'puppet:///modules/profiles_kafka/add_disk.sh',
replace => 'true',
}

file { 'account.encrypted':
path => '/root/account.encrypted',
ensure => 'present',
mode => '0400',
owner => 'root',
group => 'root',
source => 'puppet:///modules/profiles_kafka/account.encrypted',
replace => 'true',
}

cron {'fs_increase':
ensure => present,
user => 'root',
command => "/usr/bin/python3 fs_increase.py",
minute => '*/10',
}
}

Since it’s a very bad practice to leave you Fernet encryption key in the file, it shall be stored in an eyaml encrypted form in Hiera and the actual field from the file uses erb replacement to add it.

key = b'<%= @account_key %>'

Conclusion:

This was our short incursion in playing with GCP API for disk management. If there are questions, please use the comments, we are glad to have your feedback so that we can improve even more.

--

--

Sorin Tudor
METRO SYSTEMS Romania

DevOps Engineer, Technical blogger (log-it.tech) and amateur photographer.