Un ejemplo de este tipo de fuente de datos puede ser la consulta a un servicio basado en una API particular, peticiones a bases de datos de distintos fabricantes, o la activación mediante algún protocolo específico del Northbound de un sistema externo (por ejemplo, los servicios de localización de algunos fabricantes de dispositivos WiFi). En estas ocasiones hace falta un desarrollo particular que permita dicha conexión.
En este artículo abordaremos un caso muy habitual, que es sencillo de implementar: la consulta a una WAPI (web API) de un servicio externo.
Escenarios de procesamiento de streams
Unos de los casos de uso habituales a los que nos enfrentamos al desplegar WDP, es en entornos IoT donde los dispositivos realizan mediciones en sus inmediaciones, por ejemplo, de presión, temperatura o humedad. En estos casos es interesante recopilar la información meteorológica ambiental y, si es posible, el pronóstico para los próximos días, de manera que se puedan tomar decisiones en base a estos datos.
Imaginemos el caso particular de una placa solar con dos sensores de temperatura, uno para la propia placa y otro para el depósito. Para un correcto control del sistema es importante obtener la temperatura ambiente de la zona y su pronóstico, de manera que se puedan ajustar los momentos en que la bomba del conjunto placa-depósito se active o desactive en base al diferencial de temperatura, que naturalmente puede ser variable.
Para abordar esto, el Prozzie, además de recibir los datos de temperatura de los dos sensores vía mqtt (por poner un ejemplo), deberá recabar la información de la temperatura ambiental conectándose a un servicio público mediante una WAPI.
Hay muchos servicios de este tipo, y uno muy extendido, que permite consultas de manera gratuita hasta ciertos límites razonables, es https://openweathermap.org, que tiene una WAPI pública, muy bien documentada.
La implementación
Para estos casos, lo mejor es crear un pequeño programa, en el lenguaje que más cómodo se sienta uno, y empaquetarlo en formato docker, de manera que se integre bien con el software sobre el que funciona el Prozzie.
En nuestro caso, se trata de un script en Ruby que, básicamente, construye la query que se lanza a la WAPI de openweathermap para inyectarlo en nuestro broker de kafka a un topic determinado, una vez obtenido el dato en formato JSON. Este proceso se repite con una espera intermedia que deberá ser, al menos, el mínimo obligatorio que corresponde a la licencia del servicio.
Ésta es una versión sencilla del script, para este artículo:
openweathermap2k.rb:
#!/usr/bin/env ruby require 'net/http' require 'json' require 'kafka' STDOUT.sync = true @name = "openweathermap2k" lat = ENV['LATITUDE'] lon = ENV['LONGITUDE'] apikey = ENV['APIKEY'] url = ENV['URL'].nil? ? "http://api.openweathermap.org/data/2.5/weather?lat=#{lat}&lon=#{lon}&units=metric&appid=#{apikey}" : ENV['URL'] @time = ENV['TIME'].nil? ? 900 : ENV['TIME'].to_i # minimum 10 minutes, defaults to 15 minutes @time = 600 if @time < 600 kafka_broker = ENV['KAFKA_BROKER'].nil? ? "127.0.0.1" : ENV['KAFKA_BROKER'] kafka_port = ENV['KAFKA_PORT'].nil? ? "9092" : ENV['KAFKA_PORT'] @kafka_topic = ENV['KAFKA_TOPIC'].nil? ? "openweathermap" : ENV['KAFKA_TOPIC'] kclient = Kafka.new(seed_brokers: ["#{kafka_broker}:#{kafka_port}"], client_id: "openweathermap2k") def w2k(url,kclient) puts "[#{@name}] Starting openweathermap thread" while true begin puts "[#{@name}] Connecting to #{url}" unless ENV['DEBUG'].nil? openw = Net::HTTP.get(URI(url)) openwhash = JSON.parse(openw) openwhash["timestamp"] = Time.now.to_i puts "openweathermap event: #{openwhash.to_json}n" unless ENV['DEBUG'].nil? kclient.deliver_message("#{openwhash.to_json}",topic: @kafka_topic) sleep @time rescue Exception => e puts "Exception: #{e.message}" end end end Signal.trap('INT') { throw :sigint } catch :sigint do t1 = Thread.new{w2k(url,kclient)} t1.join end puts "Exiting from openweathermap2"
Un ejemplo de lo que se puede obtener mediante una query a la API de openweathermap sería el siguiente:
# curl 'http://api.openweathermap.org/data/2.5/weather?lat=37.345&lon=-6.072&units=metric&appid=abc123abc123’ | jq . { "coord": { "lon": -6.07, "lat": 37.35 }, "weather": [ { "id": 800, "main": "Clear", "description": "clear sky", "icon": "01d" } ], "base": "stations", "main": { "temp": 31.69, "pressure": 1018, "humidity": 45, "temp_min": 31, "temp_max": 32 }, "visibility": 10000, "wind": { "speed": 1.5, "deg": 110 }, "clouds": { "all": 0 }, "dt": 1537531200, "sys": { "type": 1, "id": 5511, "message": 0.0029, "country": "ES", "sunrise": 1537510293, "sunset": 1537554134 }, "id": 2514287, "name": "Mairena del Aljarafe", "cod": 200 }
Como se puede observar, el servicio da mucha información, quizás más de la necesaria. No obstante, nuestra intención no es poner ninguna lógica en el lado del Prozzie, que debería limitarse a recopilar datos. Preferimos insertar el dato tal cual está y, posteriormente, seleccionar sólo las claves interesantes para nuestras métricas y KPIs, mediante un procesado en el normalizador (otro elemento de WDP).
El script se puede ubicar en un directorio cualquiera del Prozzie, por ejemplo /usr/src/openweathermap, pero con la condición de poner dos archivos más en el mismo directorio: Dockerfile y Gemfile, que han de tener los siguientes contenidos:
Gemfile:
source 'https://rubygems.org' gem 'http' gem 'json' gem 'ruby-kafka'
Dockerfile:
FROM alpine:3.7 as builder MAINTAINER wizzie systems <systems@wizzie.io> ENV BUILD_PACKAGES bash curl-dev ruby-dev build-base ENV RUBY_PACKAGES ruby ruby-bundler RUN apk add --no-cache $BUILD_PACKAGES $RUBY_PACKAGES && mkdir /usr/app WORKDIR /usr/app COPY Gemfile /usr/app/ RUN bundle install FROM alpine:3.7 RUN apk add --no-cache ruby COPY --from=builder /usr/lib/ruby /usr/lib/ruby COPY ./openweathermap2k.rb /usr/local/bin ENTRYPOINT ["/usr/local/bin/openweathermap2k.rb"]
El primero lo necesita bundle para instalar las gemas necesarias para el funcionamiento del script. El segundo es el archivo Dockerfile que buscará el constructor de imágenes de docker (docker build) para crear la imagen desde la que se instanciará el contenedor. Un detalle importante a tener en cuenta es que se requiere de una versión de docker lo más actualizada posible (al menos docker 17.05 o superior) para que acepte la opción Multistage build que se usa en este Dockerfile.
Así pues, con estos tres archivos ubicados juntos en el mismo directorio, ejecutamos un build desde ahí:
# docker build -t test/openweathremap2k . Sending build context to Docker daemon 5.632kB Step 1/13 : FROM alpine:3.7 as builder ---> 34ea7509dcad ... Step 13/13 : ENTRYPOINT ["/usr/local/bin/openweathermap2k.rb"] ---> Running in 4d85d974a170 Removing intermediate container 4d85d974a170 ---> 962722eae958 Successfully built 962722eae958 Successfully tagged test/openweathremap2k:latest
El proceso es muy rápido y sencillo, y nos permite obtener una imagen de menos de 40MB que podemos ejecutar en cualquier sistema que soporte dockers. Como viene siendo habitual cuando usamos este sistema, los parámetros de configuración se pasan a través de variables de entorno:
- KAFKA_BROKER: dirección IP del broker de kafka al que conectar, que en el Prozzie suele ser la IP principal de gestión.
- KAFKA_PORT: puerto TCP del broker de kafka, por defecto 9092 (y por tanto opcional).
- KAFKA_TOPIC: topic de kafka al que suscribirse para publicar, por defecto openweathermap.
- LATITUDE and LONGITUDE: coordenadas de la posición sobre la que se desean obtener los valores de las condiciones ambientales, en radianes. Por ejemplo, Sevilla sería LATITUDE=37.3755, LONGITUDE=-6.0251.
- URL: dirección URL completa si se quiere usar una alternativa a la que el programa ofrece (con lo que LONGITUDE y LATITUDE no se aplicarían).
- TIME: tiempo de espera entre cada petición, en segundos. Por defecto son 900 segundos (15 minutos) y como mínimo 600 segundos (10 minutos).
- APIKEY: clave a usar para realizar las queries y que se obtiene cuando nos registramos en el servicio.
Una vez que tengamos los datos necesarios, podemos hacer una primera ejecución de prueba:
# docker run --env DEBUG=1 --env APIKEY="abc123abc123abc123" --env LONGITUDE="-6.025" --env LATITUDE="37.39" --env KAFKA_BROKER="10.128.15.51" --env KAFKA_PORT="9092" --network prozzie_default -it test/openweathremap2k [openweathermap2k] Starting openweathermap thread [openweathermap2k] Connecting to http://api.openweathermap.org/data/2.5/weather?lat=37.39&lon=-6.025&units=metric&appid=abc123abc123abc123 openweathermap event: {"coord":{"lon":-6.03,"lat":37.39},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":31.54,"pressure":1021,"humidity":29,"temp_min":31,"temp_max":32},"visibility":10000,"wind":{"speed":3.6,"deg":50},"clouds":{"all":0},"dt":1537696800,"sys":{"type":1,"id":5511,"message":0.0104,"country":"ES","sunrise":1537683179,"sunset":1537726747},"id":6360976,"name":"Camas","cod":200,"timestamp":1537698909} ^C # prozzie kafka consume openweathermap --from-beginning {"coord":{"lon":-6.03,"lat":37.39},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":31.54,"pressure":1021,"humidity":29,"temp_min":31,"temp_max":32},"visibility":10000,"wind":{"speed":3.6,"deg":50},"clouds":{"all":0},"dt":1537696800,"sys":{"type":1,"id":5511,"message":0.0104,"country":"ES","sunrise":1537683179,"sunset":1537726747},"id":6360976,"name":"Camas","cod":200,"timestamp":1537698909} ^C
Es importante poner la opción “–network prozzie_default” en la ejecución del docker para que el contenedor pueda alcanzar al broker de kafka. Si todo ha salido correctamente, podemos ejecutar el contenedor con la opción detach.
# docker run -d --env APIKEY="abc123abc123abc123" --env LONGITUDE="-6.025" --env LATITUDE="37.375" --env KAFKA_BROKER="10.128.15.51" --env KAFKA_PORT="9092" --network prozzie_default -it test/openweathremap2k 7f5e5dea4a5b730b32e851cd821cad09499df83060ef37d191c3087fa5630723
A partir de este momento, empezarán a aparecer mensajes en el topic openweathermap cada 15 minutos para la localización que hemos indicado.
# prozzie kafka consume openweathermap {"coord":{"lon":-6.03,"lat":37.39},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":32,"pressure":1021,"humidity":27,"temp_min":32,"temp_max":32},"visibility":10000,"wind":{"speed":3.1,"deg":60},"clouds":{"all":0},"dt":1537698600,"sys":{"type":1,"id":5511,"message":0.003,"country":"ES","sunrise":1537683181,"sunset":1537726745},"id":6360976,"name":"Camas","cod":200,"timestamp":1537700674} {"coord":{"lon":-6.03,"lat":37.39},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":33.54,"pressure":1020,"humidity":26,"temp_min":33,"temp_max":34},"visibility":10000,"wind":{"speed":4.6,"deg":70},"clouds":{"all":0},"dt":1537700400,"sys":{"type":1,"id":5511,"message":0.3452,"country":"ES","sunrise":1537683181,"sunset":1537726744},"id":6360976,"name":"Camas","cod":200,"timestamp":1537701575}
Conclusiones
Como resumen final, queremos indicar que la creación de conectores para el Prozzie se puede realizar de muchas y variadas formas, todas perfectamente válidas. Hemos visto una de ellas, creando un script en Ruby que empaquetamos en formato docker para lanzarlo como un contenedor más, realizando queries cada 15 minutos y enviando el dato adquirido a un topic de Kafka para que, posteriormente, alcance el pipeline de WDP.
El flujo principal de estos datos se puede potenciar con los nuevos datos y así mejorar la información mediante procesos de normalización y de enriquecimiento.
En artículos próximos veremos este proceso con más detenimiento.
We hope you have found this small tutorial useful. Please don’t hesitate to contact us at technology@wizzie.io if you need further information.
Recent Comments