When the data source turns out to have no out-of-the box supported standard protocol and requires a lot of customization, we need to develop a connector that allows the acquisition of the desired data. In this article we are going to see how to create such a connector easily and quickly through scripting, and how to integrate it into the Prozzie so that the information flows exactly the same as if it were one of the already supported protocols.

Juan Jesús Prieto

WDP (Wizzie Data Platform), together with its data collection counterpart, the so-called “Prozzie”, form a sufficiently complete solution for most situations where the use of standard protocols is required, such as mqtt, netflow or snmp, to mention a few. However, on other occasions we may find that the data source we need to query does not correspond to any of the supported out of the box, or requires some specialized treatment.

Examples of this type of data sources may be the query to a service that is based on a particular API, requests to databases of different manufacturers, or the activation by means of some specific protocol of the Northbound of an external system (for example, the location services of certain WiFi devices manufacturers). In all these cases, a special development that allows these connections is needed.

In this article we are going to deal with a very common case, which is easy to implement: the query to a WAPI (web API) of an external service.

 

Stream Processing Scenarios

One of the usual cases that we face when deploying WDP is in IoT environments where the devices perform measurements in their surroundings, for example: pressure, temperature or humidity. In these cases it’s interesting to gather environmental and meteorological information and, if possible, get the forecast for the next few days, so that decisions can be made based on this information.

Imagine the specific case of a solar panel with two temperature sensors, one for the panel itself and another one for the tank. For an appropriate control of the system it is important to obtain the ambient temperature of the area and its forecast, so that the pump of the panel-tank assembly can be activated or deactivated based on the temperature differential, which can naturally be adjusted be variable.

To address this, the Prozzie, in addition to receiving the temperature data of the two sensors via mqtt (for example), has to collect the information of the environmental temperature by connecting to a public service through a WAPI.

There are many services of this type, and a very widespread one, which allows free consultations up to certain reasonable limits, is https://openweathermap.org, which has a very well documented public WAPI.

 

The implementation

For these particular cases, it is best to create a small program, in the language you feel most comfortable with, and package it in docker format, so that it integrates well with the software on which Prozzie works.

In our case, it is a Ruby script that basically builds the query that is sent to the openweathermap WAPI in order to inject it into our kafka broker into a specific topic, once the data has been obtained in the JSON format. This process is repeated with an average waiting period that must be, at least, the mandatory minimum corresponding to the service license.

This is a simple version of the script, for this article:

openweathermap2k.rb:

#!/usr/bin/env ruby

require 'net/http'
require 'json'
require 'kafka'


STDOUT.sync = true
@name = "openweathermap2k"
lat = ENV['LATITUDE']
lon = ENV['LONGITUDE']
apikey = ENV['APIKEY']
url = ENV['URL'].nil? ? "http://api.openweathermap.org/data/2.5/weather?lat=#{lat}&lon=#{lon}&units=metric&appid=#{apikey}" : ENV['URL']
@time = ENV['TIME'].nil? ? 900 : ENV['TIME'].to_i # minimum 10 minutes, defaults to 15 minutes
@time = 600 if @time < 600
kafka_broker = ENV['KAFKA_BROKER'].nil? ? "127.0.0.1" : ENV['KAFKA_BROKER']
kafka_port = ENV['KAFKA_PORT'].nil? ? "9092" : ENV['KAFKA_PORT']
@kafka_topic = ENV['KAFKA_TOPIC'].nil? ? "openweathermap" : ENV['KAFKA_TOPIC']
kclient = Kafka.new(seed_brokers: ["#{kafka_broker}:#{kafka_port}"], client_id: "openweathermap2k")

def w2k(url,kclient)
    puts "[#{@name}] Starting openweathermap thread"
    while true
        begin
            puts "[#{@name}] Connecting to #{url}" unless ENV['DEBUG'].nil?
            openw = Net::HTTP.get(URI(url))
            openwhash = JSON.parse(openw)
            openwhash["timestamp"] = Time.now.to_i
            puts "openweathermap event: #{openwhash.to_json}\n" unless ENV['DEBUG'].nil?
            kclient.deliver_message("#{openwhash.to_json}",topic: @kafka_topic)
            sleep @time
        rescue Exception => e
            puts "Exception: #{e.message}"
        end
    end

end


Signal.trap('INT') { throw :sigint }

catch :sigint do
        t1 = Thread.new{w2k(url,kclient)}
        t1.join
end

puts "Exiting from openweathermap2"

An example of what can be obtained with a query to the openweathermap API would be the following:

# curl 'http://api.openweathermap.org/data/2.5/weather?lat=37.345&lon=-6.072&units=metric&appid=abc123abc123’ | jq . 
{
  "coord": {
    "lon": -6.07,
    "lat": 37.35
  },
  "weather": [
    {
      "id": 800,
      "main": "Clear",
      "description": "clear sky",
      "icon": "01d"
    }
  ],
  "base": "stations",
  "main": {
    "temp": 31.69,
    "pressure": 1018,
    "humidity": 45,
    "temp_min": 31,
    "temp_max": 32
  },
  "visibility": 10000,
  "wind": {
    "speed": 1.5,
    "deg": 110
  },
  "clouds": {
    "all": 0
  },
  "dt": 1537531200,
  "sys": {
    "type": 1,
    "id": 5511,
    "message": 0.0029,
    "country": "ES",
    "sunrise": 1537510293,
    "sunset": 1537554134
  },
  "id": 2514287,
  "name": "Mairena del Aljarafe",
  "cod": 200
}

As you can see, the service gives a lot of information, perhaps more than necessary. However, our intention isn’t to put any logic on the Prozzie side, which should be limited to collecting data. We prefer to insert the data as it is and, later, select only the keys that are interesting for our metrics and KPIs, by means of some processing in the Normalizer (another WDP module).

The script can be located in any Prozzie directory, for example /usr/src/openweathermap, but we have to put two more files in the same directory: Dockerfile and Gemfile, which should have the following contents:

Gemfile:

source 'https://rubygems.org'
gem 'http'
gem 'json'
gem 'ruby-kafka'

Dockerfile:

FROM alpine:3.7 as builder
MAINTAINER wizzie systems 

ENV BUILD_PACKAGES bash curl-dev ruby-dev build-base
ENV RUBY_PACKAGES ruby ruby-bundler

RUN apk add --no-cache $BUILD_PACKAGES $RUBY_PACKAGES && \
    mkdir /usr/app 

WORKDIR /usr/app

COPY Gemfile /usr/app/
RUN bundle install

FROM alpine:3.7

RUN apk add --no-cache ruby

COPY --from=builder /usr/lib/ruby /usr/lib/ruby
COPY ./openweathermap2k.rb /usr/local/bin
ENTRYPOINT ["/usr/local/bin/openweathermap2k.rb"]

The first one is needed by bundle to install the gems necessary for the script operation. The second is the Dockerfile file that will look for the docker image builder (docker build) to create the image from which the container will be instantiated. An important detail to keep in mind is that you need an as up-to-date as possible version of docker (at least 17.05 or higher) to allow the Multistage build option used in this Dockerfile.

So, with these three files located together in the same directory, we can execute a build from there:

# docker build -t test/openweathremap2k .
Sending build context to Docker daemon  5.632kB
Step 1/13 : FROM alpine:3.7 as builder
 ---> 34ea7509dcad
...
Step 13/13 : ENTRYPOINT ["/usr/local/bin/openweathermap2k.rb"]
 ---> Running in 4d85d974a170
Removing intermediate container 4d85d974a170
 ---> 962722eae958
Successfully built 962722eae958
Successfully tagged test/openweathremap2k:latest

The process is very fast and simple, and allows us to obtain an image of less than 40MB in size that we can execute on any system that supports dockers. As always, when using this system, the configuration parameters are passed through environment variables:

  • KAFKA_BROKER: IP address of the kafka broker to connect to, which in the Prozzie is usually the main management IP.
  • KAFKA_PORT: TCP port of the kafka broker, by default 9092 (and therefore optional).
  • KAFKA_TOPIC: kafka topic to subscribe to in order to publish, by default openweathermap.
  • LATITUDE and LONGITUDE: coordinates of the position from where you want to get the environmental condition values, in radians. For example, Sevilla would be LATITUDE = 37.3755, LONGITUDE = -6.0251.
  • URL: full URL, if you want to use a different one from the one the program offers (in this case LONGITUDE and LATITUDE would not apply).
  • TIME: waiting time between each request, in seconds. By default 900 seconds (15 minutes) and at least 600 seconds (10 minutes).
  • APIKEY: key used to perform the queries and that is obtained when we register at the service.

Once we have the necessary data, we can make a first test run:

# docker run --env DEBUG=1 --env APIKEY="abc123abc123abc123" --env LONGITUDE="-6.025" --env LATITUDE="37.39" --env KAFKA_BROKER="10.128.15.51" --env KAFKA_PORT="9092" --network prozzie_default -it test/openweathremap2k
[openweathermap2k] Starting openweathermap thread
[openweathermap2k] Connecting to http://api.openweathermap.org/data/2.5/weather?lat=37.39&lon=-6.025&units=metric&appid=abc123abc123abc123
openweathermap event: {"coord":{"lon":-6.03,"lat":37.39},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":31.54,"pressure":1021,"humidity":29,"temp_min":31,"temp_max":32},"visibility":10000,"wind":{"speed":3.6,"deg":50},"clouds":{"all":0},"dt":1537696800,"sys":{"type":1,"id":5511,"message":0.0104,"country":"ES","sunrise":1537683179,"sunset":1537726747},"id":6360976,"name":"Camas","cod":200,"timestamp":1537698909}
^C

# prozzie kafka consume openweathermap --from-beginning
{"coord":{"lon":-6.03,"lat":37.39},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":31.54,"pressure":1021,"humidity":29,"temp_min":31,"temp_max":32},"visibility":10000,"wind":{"speed":3.6,"deg":50},"clouds":{"all":0},"dt":1537696800,"sys":{"type":1,"id":5511,"message":0.0104,"country":"ES","sunrise":1537683179,"sunset":1537726747},"id":6360976,"name":"Camas","cod":200,"timestamp":1537698909}
^C

It is important to use the “–network prozzie_default” parameter during the docker call, so that the container can reach the kafka broker. If everything has worked correctly, we can run the container with the detach option.

# docker run -d --env APIKEY="abc123abc123abc123" --env LONGITUDE="-6.025" --env LATITUDE="37.375" --env KAFKA_BROKER="10.128.15.51" --env KAFKA_PORT="9092" --network prozzie_default -it test/openweathremap2k
7f5e5dea4a5b730b32e851cd821cad09499df83060ef37d191c3087fa5630723

From this moment on, messages will start appearing in the openweathermap topic every 15 minutes for the provided location.

# prozzie kafka consume openweathermap 
{"coord":{"lon":-6.03,"lat":37.39},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":32,"pressure":1021,"humidity":27,"temp_min":32,"temp_max":32},"visibility":10000,"wind":{"speed":3.1,"deg":60},"clouds":{"all":0},"dt":1537698600,"sys":{"type":1,"id":5511,"message":0.003,"country":"ES","sunrise":1537683181,"sunset":1537726745},"id":6360976,"name":"Camas","cod":200,"timestamp":1537700674}
{"coord":{"lon":-6.03,"lat":37.39},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":33.54,"pressure":1020,"humidity":26,"temp_min":33,"temp_max":34},"visibility":10000,"wind":{"speed":4.6,"deg":70},"clouds":{"all":0},"dt":1537700400,"sys":{"type":1,"id":5511,"message":0.3452,"country":"ES","sunrise":1537683181,"sunset":1537726744},"id":6360976,"name":"Camas","cod":200,"timestamp":1537701575}

Conclusions

To conclude, we want to indicate that the creation of connectors for the Prozzie can be done in many and varied ways, all perfectly valid. We have seen one of them, creating a Ruby script that we encapsulate in docker format to launch it as a container, making queries every 15 minutes and sending the acquired data to a Kafka topic so that, later, it can reach the WDP pipeline.

The main data flow can be enhanced with additional data and, therefore, improve the final information through normalization and enrichment processes.

In upcoming articles we will see this in more detail.

We hope you have found this small tutorial useful. Please don’t hesitate to contact us at technology@wizzie.io if you need further information.

Share This