Examples of this type of data sources may be the query to a service that is based on a particular API, requests to databases of different manufacturers, or the activation by means of some specific protocol of the Northbound of an external system (for example, the location services of certain WiFi devices manufacturers). In all these cases, a special development that allows these connections is needed.
In this article we are going to deal with a very common case, which is easy to implement: the query to a WAPI (web API) of an external service.
Stream Processing Scenarios
One of the usual cases that we face when deploying WDP is in IoT environments where the devices perform measurements in their surroundings, for example: pressure, temperature or humidity. In these cases it’s interesting to gather environmental and meteorological information and, if possible, get the forecast for the next few days, so that decisions can be made based on this information.
Imagine the specific case of a solar panel with two temperature sensors, one for the panel itself and another one for the tank. For an appropriate control of the system it is important to obtain the ambient temperature of the area and its forecast, so that the pump of the panel-tank assembly can be activated or deactivated based on the temperature differential, which can naturally be adjusted be variable.
To address this, the Prozzie, in addition to receiving the temperature data of the two sensors via mqtt (for example), has to collect the information of the environmental temperature by connecting to a public service through a WAPI.
There are many services of this type, and a very widespread one, which allows free consultations up to certain reasonable limits, is https://openweathermap.org, which has a very well documented public WAPI.
The implementation
For these particular cases, it is best to create a small program, in the language you feel most comfortable with, and package it in docker format, so that it integrates well with the software on which Prozzie works.
In our case, it is a Ruby script that basically builds the query that is sent to the openweathermap WAPI in order to inject it into our kafka broker into a specific topic, once the data has been obtained in the JSON format. This process is repeated with an average waiting period that must be, at least, the mandatory minimum corresponding to the service license.
This is a simple version of the script, for this article:
openweathermap2k.rb:
#!/usr/bin/env ruby require 'net/http' require 'json' require 'kafka' STDOUT.sync = true @name = "openweathermap2k" lat = ENV['LATITUDE'] lon = ENV['LONGITUDE'] apikey = ENV['APIKEY'] url = ENV['URL'].nil? ? "http://api.openweathermap.org/data/2.5/weather?lat=#{lat}&lon=#{lon}&units=metric&appid=#{apikey}" : ENV['URL'] @time = ENV['TIME'].nil? ? 900 : ENV['TIME'].to_i # minimum 10 minutes, defaults to 15 minutes @time = 600 if @time < 600 kafka_broker = ENV['KAFKA_BROKER'].nil? ? "127.0.0.1" : ENV['KAFKA_BROKER'] kafka_port = ENV['KAFKA_PORT'].nil? ? "9092" : ENV['KAFKA_PORT'] @kafka_topic = ENV['KAFKA_TOPIC'].nil? ? "openweathermap" : ENV['KAFKA_TOPIC'] kclient = Kafka.new(seed_brokers: ["#{kafka_broker}:#{kafka_port}"], client_id: "openweathermap2k") def w2k(url,kclient) puts "[#{@name}] Starting openweathermap thread" while true begin puts "[#{@name}] Connecting to #{url}" unless ENV['DEBUG'].nil? openw = Net::HTTP.get(URI(url)) openwhash = JSON.parse(openw) openwhash["timestamp"] = Time.now.to_i puts "openweathermap event: #{openwhash.to_json}\n" unless ENV['DEBUG'].nil? kclient.deliver_message("#{openwhash.to_json}",topic: @kafka_topic) sleep @time rescue Exception => e puts "Exception: #{e.message}" end end end Signal.trap('INT') { throw :sigint } catch :sigint do t1 = Thread.new{w2k(url,kclient)} t1.join end puts "Exiting from openweathermap2"
An example of what can be obtained with a query to the openweathermap API would be the following:
# curl 'http://api.openweathermap.org/data/2.5/weather?lat=37.345&lon=-6.072&units=metric&appid=abc123abc123’ | jq . { "coord": { "lon": -6.07, "lat": 37.35 }, "weather": [ { "id": 800, "main": "Clear", "description": "clear sky", "icon": "01d" } ], "base": "stations", "main": { "temp": 31.69, "pressure": 1018, "humidity": 45, "temp_min": 31, "temp_max": 32 }, "visibility": 10000, "wind": { "speed": 1.5, "deg": 110 }, "clouds": { "all": 0 }, "dt": 1537531200, "sys": { "type": 1, "id": 5511, "message": 0.0029, "country": "ES", "sunrise": 1537510293, "sunset": 1537554134 }, "id": 2514287, "name": "Mairena del Aljarafe", "cod": 200 }
As you can see, the service gives a lot of information, perhaps more than necessary. However, our intention isn’t to put any logic on the Prozzie side, which should be limited to collecting data. We prefer to insert the data as it is and, later, select only the keys that are interesting for our metrics and KPIs, by means of some processing in the Normalizer (another WDP module).
The script can be located in any Prozzie directory, for example /usr/src/openweathermap, but we have to put two more files in the same directory: Dockerfile and Gemfile, which should have the following contents:
Gemfile:
source 'https://rubygems.org' gem 'http' gem 'json' gem 'ruby-kafka'
Dockerfile:
FROM alpine:3.7 as builder MAINTAINER wizzie systemsENV BUILD_PACKAGES bash curl-dev ruby-dev build-base ENV RUBY_PACKAGES ruby ruby-bundler RUN apk add --no-cache $BUILD_PACKAGES $RUBY_PACKAGES && \ mkdir /usr/app WORKDIR /usr/app COPY Gemfile /usr/app/ RUN bundle install FROM alpine:3.7 RUN apk add --no-cache ruby COPY --from=builder /usr/lib/ruby /usr/lib/ruby COPY ./openweathermap2k.rb /usr/local/bin ENTRYPOINT ["/usr/local/bin/openweathermap2k.rb"]
The first one is needed by bundle to install the gems necessary for the script operation. The second is the Dockerfile file that will look for the docker image builder (docker build) to create the image from which the container will be instantiated. An important detail to keep in mind is that you need an as up-to-date as possible version of docker (at least 17.05 or higher) to allow the Multistage build option used in this Dockerfile.
So, with these three files located together in the same directory, we can execute a build from there:
# docker build -t test/openweathremap2k . Sending build context to Docker daemon 5.632kB Step 1/13 : FROM alpine:3.7 as builder ---> 34ea7509dcad ... Step 13/13 : ENTRYPOINT ["/usr/local/bin/openweathermap2k.rb"] ---> Running in 4d85d974a170 Removing intermediate container 4d85d974a170 ---> 962722eae958 Successfully built 962722eae958 Successfully tagged test/openweathremap2k:latest
The process is very fast and simple, and allows us to obtain an image of less than 40MB in size that we can execute on any system that supports dockers. As always, when using this system, the configuration parameters are passed through environment variables:
- KAFKA_BROKER: IP address of the kafka broker to connect to, which in the Prozzie is usually the main management IP.
- KAFKA_PORT: TCP port of the kafka broker, by default 9092 (and therefore optional).
- KAFKA_TOPIC: kafka topic to subscribe to in order to publish, by default openweathermap.
- LATITUDE and LONGITUDE: coordinates of the position from where you want to get the environmental condition values, in radians. For example, Sevilla would be LATITUDE = 37.3755, LONGITUDE = -6.0251.
- URL: full URL, if you want to use a different one from the one the program offers (in this case LONGITUDE and LATITUDE would not apply).
- TIME: waiting time between each request, in seconds. By default 900 seconds (15 minutes) and at least 600 seconds (10 minutes).
- APIKEY: key used to perform the queries and that is obtained when we register at the service.
Once we have the necessary data, we can make a first test run:
# docker run --env DEBUG=1 --env APIKEY="abc123abc123abc123" --env LONGITUDE="-6.025" --env LATITUDE="37.39" --env KAFKA_BROKER="10.128.15.51" --env KAFKA_PORT="9092" --network prozzie_default -it test/openweathremap2k [openweathermap2k] Starting openweathermap thread [openweathermap2k] Connecting to http://api.openweathermap.org/data/2.5/weather?lat=37.39&lon=-6.025&units=metric&appid=abc123abc123abc123 openweathermap event: {"coord":{"lon":-6.03,"lat":37.39},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":31.54,"pressure":1021,"humidity":29,"temp_min":31,"temp_max":32},"visibility":10000,"wind":{"speed":3.6,"deg":50},"clouds":{"all":0},"dt":1537696800,"sys":{"type":1,"id":5511,"message":0.0104,"country":"ES","sunrise":1537683179,"sunset":1537726747},"id":6360976,"name":"Camas","cod":200,"timestamp":1537698909} ^C # prozzie kafka consume openweathermap --from-beginning {"coord":{"lon":-6.03,"lat":37.39},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":31.54,"pressure":1021,"humidity":29,"temp_min":31,"temp_max":32},"visibility":10000,"wind":{"speed":3.6,"deg":50},"clouds":{"all":0},"dt":1537696800,"sys":{"type":1,"id":5511,"message":0.0104,"country":"ES","sunrise":1537683179,"sunset":1537726747},"id":6360976,"name":"Camas","cod":200,"timestamp":1537698909} ^C
It is important to use the “–network prozzie_default” parameter during the docker call, so that the container can reach the kafka broker. If everything has worked correctly, we can run the container with the detach option.
# docker run -d --env APIKEY="abc123abc123abc123" --env LONGITUDE="-6.025" --env LATITUDE="37.375" --env KAFKA_BROKER="10.128.15.51" --env KAFKA_PORT="9092" --network prozzie_default -it test/openweathremap2k 7f5e5dea4a5b730b32e851cd821cad09499df83060ef37d191c3087fa5630723
From this moment on, messages will start appearing in the openweathermap topic every 15 minutes for the provided location.
# prozzie kafka consume openweathermap {"coord":{"lon":-6.03,"lat":37.39},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":32,"pressure":1021,"humidity":27,"temp_min":32,"temp_max":32},"visibility":10000,"wind":{"speed":3.1,"deg":60},"clouds":{"all":0},"dt":1537698600,"sys":{"type":1,"id":5511,"message":0.003,"country":"ES","sunrise":1537683181,"sunset":1537726745},"id":6360976,"name":"Camas","cod":200,"timestamp":1537700674} {"coord":{"lon":-6.03,"lat":37.39},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":33.54,"pressure":1020,"humidity":26,"temp_min":33,"temp_max":34},"visibility":10000,"wind":{"speed":4.6,"deg":70},"clouds":{"all":0},"dt":1537700400,"sys":{"type":1,"id":5511,"message":0.3452,"country":"ES","sunrise":1537683181,"sunset":1537726744},"id":6360976,"name":"Camas","cod":200,"timestamp":1537701575}
Conclusions
To conclude, we want to indicate that the creation of connectors for the Prozzie can be done in many and varied ways, all perfectly valid. We have seen one of them, creating a Ruby script that we encapsulate in docker format to launch it as a container, making queries every 15 minutes and sending the acquired data to a Kafka topic so that, later, it can reach the WDP pipeline.
The main data flow can be enhanced with additional data and, therefore, improve the final information through normalization and enrichment processes.
In upcoming articles we will see this in more detail.
We hope you have found this small tutorial useful. Please don’t hesitate to contact us at technology@wizzie.io if you need further information.
Recent Comments