Cuando la fuente de datos no tiene protocolos soportados de forma estándar y necesita personalización, hemos de desarrollar un conector que permita la adquisición de los datos deseados. En este artículo vamos a ver cómo crear uno de estos conectores de forma sencilla y rápida mediante scripting, y cómo integrarlo en el Prozzie de forma que la información fluya de la misma forma que si se tratara de protocolos ya soportados.

Juan Jesús Prieto

WDP junto con su parte recolectora, el Prozzie, forman una solución suficientemente completa para muchas de las situaciones donde se requiere el uso de protocolos estándares muy extendidos, como son mqtt, netflow o snmp, por poner algunos ejemplos. Sin embargo en muchas ocasiones me encuentro que la fuente de datos que necesito consultar no corresponde con ninguna de las soportadas oficialmente o requiere de algún tratamiento especializado.

Un ejemplo de este tipo de fuente de datos puede ser la consulta a un servicio basado en una API particular, peticiones a bases de datos de distintos fabricantes, o la activación mediante algún protocolo específico del Northbound de un sistema externo (por ejemplo, los servicios de localización de algunos fabricantes de dispositivos WiFi). En estas ocasiones hace falta un desarrollo particular que permita dicha conexión.

En este artículo abordaremos un caso muy habitual, que es sencillo de implementar: la consulta a una WAPI (web API) de un servicio externo.

 

Escenarios de procesamiento de streams

Unos de los casos de uso habituales a los que nos enfrentamos al desplegar WDP, es en entornos IoT donde los dispositivos realizan mediciones en sus inmediaciones, por ejemplo, de presión, temperatura o humedad. En estos casos es interesante recopilar la información meteorológica ambiental y, si es posible, el pronóstico para los próximos días, de manera que se puedan tomar decisiones en base a estos datos.

Imaginemos el caso particular de una placa solar con dos sensores de temperatura, uno para la propia placa y otro para el depósito. Para un correcto control del sistema es importante obtener la temperatura ambiente de la zona y su pronóstico, de manera que se puedan ajustar los momentos en que la bomba del conjunto placa-depósito se active o desactive en base al diferencial de temperatura, que naturalmente puede ser variable.

Para abordar esto, el Prozzie, además de recibir los datos de temperatura de los dos sensores vía mqtt (por poner un ejemplo), deberá recabar la información de la temperatura ambiental conectándose a un servicio público mediante una WAPI.

Hay muchos servicios de este tipo, y uno muy extendido, que permite consultas de manera gratuita hasta ciertos límites razonables, es https://openweathermap.org, que tiene una WAPI pública, muy bien documentada.

 

La implementación

Para estos casos, lo mejor es crear un pequeño programa, en el lenguaje que más cómodo se sienta uno, y empaquetarlo en formato docker, de manera que se integre bien con el software sobre el que funciona el Prozzie.

En nuestro caso, se trata de un script en Ruby que, básicamente, construye la query que se lanza a la WAPI de openweathermap para inyectarlo en nuestro broker de kafka a un topic determinado, una vez obtenido el dato en formato JSON. Este proceso se repite con una espera intermedia que deberá ser, al menos, el mínimo obligatorio que corresponde a la licencia del servicio.

Ésta es una versión sencilla del script, para este artículo:

openweathermap2k.rb:

#!/usr/bin/env ruby

require 'net/http'
require 'json'
require 'kafka'


STDOUT.sync = true
@name = "openweathermap2k"
lat = ENV['LATITUDE']
lon = ENV['LONGITUDE']
apikey = ENV['APIKEY']
url = ENV['URL'].nil? ? "http://api.openweathermap.org/data/2.5/weather?lat=#{lat}&lon=#{lon}&units=metric&appid=#{apikey}" : ENV['URL']
@time = ENV['TIME'].nil? ? 900 : ENV['TIME'].to_i # minimum 10 minutes, defaults to 15 minutes
@time = 600 if @time < 600 kafka_broker = ENV['KAFKA_BROKER'].nil? ? "127.0.0.1" : ENV['KAFKA_BROKER'] kafka_port = ENV['KAFKA_PORT'].nil? ? "9092" : ENV['KAFKA_PORT'] @kafka_topic = ENV['KAFKA_TOPIC'].nil? ? "openweathermap" : ENV['KAFKA_TOPIC'] kclient = Kafka.new(seed_brokers: ["#{kafka_broker}:#{kafka_port}"], client_id: "openweathermap2k") def w2k(url,kclient) puts "[#{@name}] Starting openweathermap thread" while true begin puts "[#{@name}] Connecting to #{url}" unless ENV['DEBUG'].nil? openw = Net::HTTP.get(URI(url)) openwhash = JSON.parse(openw) openwhash["timestamp"] = Time.now.to_i puts "openweathermap event: #{openwhash.to_json}n" unless ENV['DEBUG'].nil? kclient.deliver_message("#{openwhash.to_json}",topic: @kafka_topic) sleep @time rescue Exception => e
            puts "Exception: #{e.message}"
        end
    end

end

Signal.trap('INT') { throw :sigint }

catch :sigint do
        t1 = Thread.new{w2k(url,kclient)}
        t1.join
end

puts "Exiting from openweathermap2"

Un ejemplo de lo que se puede obtener mediante una query a la API de openweathermap sería el siguiente:

# curl 'http://api.openweathermap.org/data/2.5/weather?lat=37.345&lon=-6.072&units=metric&appid=abc123abc123’ | jq . 
{
  "coord": {
    "lon": -6.07,
    "lat": 37.35
  },
  "weather": [
    {
      "id": 800,
      "main": "Clear",
      "description": "clear sky",
      "icon": "01d"
    }
  ],
  "base": "stations",
  "main": {
    "temp": 31.69,
    "pressure": 1018,
    "humidity": 45,
    "temp_min": 31,
    "temp_max": 32
  },
  "visibility": 10000,
  "wind": {
    "speed": 1.5,
    "deg": 110
  },
  "clouds": {
    "all": 0
  },
  "dt": 1537531200,
  "sys": {
    "type": 1,
    "id": 5511,
    "message": 0.0029,
    "country": "ES",
    "sunrise": 1537510293,
    "sunset": 1537554134
  },
  "id": 2514287,
  "name": "Mairena del Aljarafe",
  "cod": 200
}

Como se puede observar, el servicio da mucha información, quizás más de la necesaria. No obstante, nuestra intención no es poner ninguna lógica en el lado del Prozzie, que debería limitarse a recopilar datos. Preferimos insertar el dato tal cual está y, posteriormente, seleccionar sólo las claves interesantes para nuestras métricas y KPIs, mediante un procesado en el normalizador (otro elemento de WDP).

El script se puede ubicar en un directorio cualquiera del Prozzie, por ejemplo /usr/src/openweathermap, pero con la condición de poner dos archivos más en el mismo directorio: Dockerfile y Gemfile, que han de tener los siguientes contenidos:

Gemfile:

source 'https://rubygems.org'
gem 'http'
gem 'json'
gem 'ruby-kafka'

Dockerfile:

FROM alpine:3.7 as builder
MAINTAINER wizzie systems <systems@wizzie.io>

ENV BUILD_PACKAGES bash curl-dev ruby-dev build-base
ENV RUBY_PACKAGES ruby ruby-bundler

RUN apk add --no-cache $BUILD_PACKAGES $RUBY_PACKAGES && 
    mkdir /usr/app 

WORKDIR /usr/app

COPY Gemfile /usr/app/
RUN bundle install

FROM alpine:3.7

RUN apk add --no-cache ruby

COPY --from=builder /usr/lib/ruby /usr/lib/ruby
COPY ./openweathermap2k.rb /usr/local/bin
ENTRYPOINT ["/usr/local/bin/openweathermap2k.rb"]

El primero lo necesita bundle para instalar las gemas necesarias para el funcionamiento del script. El segundo es el archivo Dockerfile que buscará el constructor de imágenes de docker (docker build) para crear la imagen desde la que se instanciará el contenedor. Un detalle importante a tener en cuenta es que se requiere de una versión de docker lo más actualizada posible (al menos docker 17.05 o superior) para que acepte la opción Multistage build que se usa en este Dockerfile.

Así pues, con estos tres archivos ubicados juntos en el mismo directorio, ejecutamos un build desde ahí:

# docker build -t test/openweathremap2k .
Sending build context to Docker daemon  5.632kB
Step 1/13 : FROM alpine:3.7 as builder
 ---> 34ea7509dcad
...
Step 13/13 : ENTRYPOINT ["/usr/local/bin/openweathermap2k.rb"]
 ---> Running in 4d85d974a170
Removing intermediate container 4d85d974a170
 ---> 962722eae958
Successfully built 962722eae958
Successfully tagged test/openweathremap2k:latest

El proceso es muy rápido y sencillo, y nos permite obtener una imagen de menos de 40MB que podemos ejecutar en cualquier sistema que soporte dockers. Como viene siendo habitual cuando usamos este sistema, los parámetros de configuración se pasan a través de variables de entorno:

  • KAFKA_BROKER: dirección IP del broker de kafka al que conectar, que en el Prozzie suele ser la IP principal de gestión.
  • KAFKA_PORT: puerto TCP del broker de kafka, por defecto 9092 (y por tanto opcional).
  • KAFKA_TOPIC: topic de kafka al que suscribirse para publicar, por defecto openweathermap.
  • LATITUDE and LONGITUDE: coordenadas de la posición sobre la que se desean obtener los valores de las condiciones ambientales, en radianes. Por ejemplo, Sevilla sería LATITUDE=37.3755, LONGITUDE=-6.0251.
  • URL: dirección URL completa si se quiere usar una alternativa a la que el programa ofrece (con lo que LONGITUDE y LATITUDE no se aplicarían).
  • TIME: tiempo de espera entre cada petición, en segundos. Por defecto son 900 segundos (15 minutos) y como mínimo 600 segundos (10 minutos).
  • APIKEY: clave a usar para realizar las queries y que se obtiene cuando nos registramos en el servicio.

Una vez que tengamos los datos necesarios, podemos hacer una primera ejecución de prueba:

# docker run --env DEBUG=1 --env APIKEY="abc123abc123abc123" --env LONGITUDE="-6.025" --env LATITUDE="37.39" --env KAFKA_BROKER="10.128.15.51" --env KAFKA_PORT="9092" --network prozzie_default -it test/openweathremap2k
[openweathermap2k] Starting openweathermap thread
[openweathermap2k] Connecting to http://api.openweathermap.org/data/2.5/weather?lat=37.39&lon=-6.025&units=metric&appid=abc123abc123abc123
openweathermap event: {"coord":{"lon":-6.03,"lat":37.39},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":31.54,"pressure":1021,"humidity":29,"temp_min":31,"temp_max":32},"visibility":10000,"wind":{"speed":3.6,"deg":50},"clouds":{"all":0},"dt":1537696800,"sys":{"type":1,"id":5511,"message":0.0104,"country":"ES","sunrise":1537683179,"sunset":1537726747},"id":6360976,"name":"Camas","cod":200,"timestamp":1537698909}
^C

# prozzie kafka consume openweathermap --from-beginning
{"coord":{"lon":-6.03,"lat":37.39},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":31.54,"pressure":1021,"humidity":29,"temp_min":31,"temp_max":32},"visibility":10000,"wind":{"speed":3.6,"deg":50},"clouds":{"all":0},"dt":1537696800,"sys":{"type":1,"id":5511,"message":0.0104,"country":"ES","sunrise":1537683179,"sunset":1537726747},"id":6360976,"name":"Camas","cod":200,"timestamp":1537698909}
^C

Es importante poner la opción “–network prozzie_default” en la ejecución del docker para que el contenedor pueda alcanzar al broker de kafka. Si todo ha salido correctamente, podemos ejecutar el contenedor con la opción detach.

# docker run -d --env APIKEY="abc123abc123abc123" --env LONGITUDE="-6.025" --env LATITUDE="37.375" --env KAFKA_BROKER="10.128.15.51" --env KAFKA_PORT="9092" --network prozzie_default -it test/openweathremap2k
7f5e5dea4a5b730b32e851cd821cad09499df83060ef37d191c3087fa5630723

A partir de este momento, empezarán a aparecer mensajes en el topic openweathermap cada 15 minutos para la localización que hemos indicado.

# prozzie kafka consume openweathermap 
{"coord":{"lon":-6.03,"lat":37.39},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":32,"pressure":1021,"humidity":27,"temp_min":32,"temp_max":32},"visibility":10000,"wind":{"speed":3.1,"deg":60},"clouds":{"all":0},"dt":1537698600,"sys":{"type":1,"id":5511,"message":0.003,"country":"ES","sunrise":1537683181,"sunset":1537726745},"id":6360976,"name":"Camas","cod":200,"timestamp":1537700674}
{"coord":{"lon":-6.03,"lat":37.39},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":33.54,"pressure":1020,"humidity":26,"temp_min":33,"temp_max":34},"visibility":10000,"wind":{"speed":4.6,"deg":70},"clouds":{"all":0},"dt":1537700400,"sys":{"type":1,"id":5511,"message":0.3452,"country":"ES","sunrise":1537683181,"sunset":1537726744},"id":6360976,"name":"Camas","cod":200,"timestamp":1537701575}

Conclusiones

Como resumen final, queremos indicar que la creación de conectores para el Prozzie se puede realizar de muchas y variadas formas, todas perfectamente válidas. Hemos visto una de ellas, creando un script en Ruby que empaquetamos en formato docker para lanzarlo como un contenedor más, realizando queries cada 15 minutos y enviando el dato adquirido a un topic de Kafka para que, posteriormente, alcance el pipeline de WDP.

El flujo principal de estos datos se puede potenciar con los nuevos datos y así mejorar la información mediante procesos de normalización y de enriquecimiento.

En artículos próximos veremos este proceso con más detenimiento.

We hope you have found this small tutorial useful. Please don’t hesitate to contact us at technology@wizzie.io if you need further information.

Share This