When you need to acquire data from numerous sources, you usually are confronted with different vendors, protocols and data formats. Even for the same type of sensor, it’s difficult that you always get the same message. Therefore you need a way to unify or “normalize” your data. And that’s exactly where Wizzie Normalizer comes into play during the data acquisition process.

Pablo Casares

Stream Processing Scenarios

There are a lot of scenarios where you have different data sources you want to process. Usually, the information you wish to ingest in your system needs to be transformed in some way. The need to rename event keys, modify values or normalize the data from a variety of sources is very common. On other occasions you may need to transform your data, in order to be able to process it. In all of these situations, Normalizer helps you.

 

Wizzie Normalizer Key Features

Normalizer is a service based on the Kafka Streams library. This means we take advantage of this stream processing library, such as its scalability and processing guarantees. Normalizer has Kafka and JSON data messages as dependencies.

And don’t worry if your system doesn’t uses JSON events. For that we have Wizzie Prozzie. Prozzie helps you to ingest a great number of protocols because it is based on Kafka Connect. It will transform your data into JSON, providing a way to use all of our pipeline. You can check the Prozzie supported protocols at: https://github.com/wizzie-io/prozzie#supported-protocols. We will cover it in a future post.

Going back to Normalizer, you can think of it as a stream processing engine. And we have made things easy for you: if you want to transform a data stream from one source to another, you simply run Normalizer, load a stream configuration file indicating Normalizer what to do and it will do all those transformations for you.

Another essential Normalizer key feature is that it is designed for Big Data environments. It is fully scalable and very fast. And it allows you to create a cluster of instances that will distribute the processing load across them.

 

How Normalizer scales up and down

Because Normalizer is based on Kafka Streams, you only have to run or delete instances to scale it up or down. The underlying Kafka Streams library will distribute the current Kafka topic partitions number across Normalizer instances. If your processing needs increase, you just run another Normalizer and the Normalizer clusters will share the load. If your processing needs become smaller, you just need to delete some instances.

 

How to tell Normalizer what to do

When we designed Normalizer, we thought about easily configuring a cluster of instances. Because we have Kafka as dependency, we made every Normalizer instance to load its configuration from a special topic. We call it a bootstrap topic. This way, when you have several Normalizer instances, you simply publish your stream configuration file to the bootstrapping Kafka topic to which every Normalizer listens to, and they will all be configured at once.

 

Normalizer in action

Our Normalizer allows you to process data from almost every possible environment. As an example, imagine a Smart City scenario. This imaginary city would have a lot of sensors reporting JSON data. Imagine it has sensors for humidity, traffic lights, bus service, smart parking, urban noise, traffic congestion, air pollution and so on. These sensors may be from different manufacturers and their reported data may also be different. For example, the temperature could be reported as:

{“temp”: 23.0} and {“temperature”: 23.0}

Sensor manufacturer “X” would report the first one and sensor manufacturer “Y” would report the second one.

When you need to act, analyzing these or other values, it is more convenient to have normalized information. If every sensor reports the same information in the same way, you only have to work with the normalized one instead of having to work with every data schema. And that makes it easier for you to operate your system if you analyze every temperature data only as “temp” or “temperature”.

Normalizer allows you to do this process easier, faster and in a more configurable way. These are some of the transformations you can do with your data events are:

  • Add fields.
  • Select or simplify fields.
  • Replace values.
  • Join two or more values.
  • Select max/min from an array of values.
  • Classify numeric values
  • Split one value into multiple dimensions
  • Convert time formats.
  • Convert field types.
  • Calculate arithmetic values
  • Rename fields.
  • Filtering
  • Transform one message into several

You can, of course, combine these functionalities to design a complex system. And you can develop your functions if you need a feature that is not supported out of the box. We made it easy to implement a new function, because we developed this engine thinking of it.

We will cover our ZZ-CEP and Enricher in future posts. These services will allow you to join data, provide context and make decisions based on it.

If you need more information about the Normalizer functionalities, you can check our docs at: https://wizzie-io.github.io/normalizer/

And, for implementing a new functionality, you can visit:

https://wizzie-io.github.io/normalizer/customize/custom-development.html

Making our Smart City smart

Now, we are going to show you the use and configuration of Normalizer. First, you need to know that Normalizer can be used on top of Docker or just as a Java process.

If you want to use it as Java process, visit https://github.com/wizzie-io/normalizer and follow the compiling sources section.

If you want to use Normalizer on Docker, we have published the normalizer docker image on the Docker Hub: https://hub.docker.com/r/wizzieio/normalizer/

For this example in particular, we are going to use dockerized Normalizer. First of all, we need to run a Normalizer instance:

docker run -e APPLICATION_ID=my-normalizer -e  KAFKA_BOOTSTRAP_SERVER=192.168.1.100:9092 wizzieio/normalizer:latest

After that, we need to load a stream plan to Normalizer, to tell it what to do with the data events:

{
    "inputs":{
      "mytopic":["mystream"]
    },
    "streams":{
      "mystream":{
          "funcs":[
          {
          "name":"myMapper","className":"io.wizzie.normalizer.funcs.impl.RenameMapper",
            "properties": {
            "maps": [{"dimPath":["temperature"], "as":"temp"}    ]
            }
          }
          ],
          "sinks":[
              {"topic":"output", "type":"kafka"}
          ]
      }
   }
}

Once you have defined this, you need to send this JSON streaming configuration file to Kafka, with a my-normalizer key and a __normalizer_bootstrap topic, as we have already shown at the “How to tell Normalizer what to do” section. The easiest way to do this is to use our “streamer-kafka.sh” tool, which you can find at the Normalizer GitHub repository. After that, each Normalizer instance will transform every incoming event at the topic “mytopic” with a “temperature” key to “temp” key.

As you can see, we have defined our stream processing configuration file which allows you to use our own functions or your custom developed functions and to specify which topics to read from or write to.

As described earlier, scaling this service is very easy. You simply have to instantiate more Docker Normalizer containers. Each one will select their own Kafka partitions and process the entire streaming data topic for you.

For more information about Normalizer configuration, usage and developments please visit https://wizzie-io.github.io/normalizer/

We hope you find this small tutorial useful. Please don’t hesitate to contact us at technology@wizzie.io if you need further information.

Share This