Exposing Apache Karaf configurations with Apache Arrow Flight
This blog post shows how to use Apache Arrow and Arrow Flight in Apache Karaf runtime.
The code base is available here: https://github.com/jbonofre/karaf-arrow.
Basically, in the
For example, we have
The
In Arrow, we can represent a list of values with a
Then, we will describe
Let's take a look on the
Once we have the schema and the vectors, we "assemble" them in a
The
Let's take a look on the
The
The
The "main" method is
In the
When using Apache Arrow with Java 9+, we have to grant
Now, we can start Karaf runtime with:
The client does the following:
This blog post is the first one of a series about Apache Arrow, Arrow Flight, Arrow Flight SQL, etc.
Enjoy !
Apache Arrow ?
Apache Arrow is composed by:- The Arrow format (specification).
- Format implementations with different languages (C, Java, Go, Rust, ...).
- Extra subprojects like Flight, Flight SQL, ADBC, nanoarrow, ...
- C
- C++
- C#
- Go
- Java
- Python
- Ruby
- Rust
- ADBC (Arrow Database Connectivity) is an API specification for Arrow-based database access. It's similar to JDBC but in a Arrow centric way.
- Arrow Flight and Arrow Flight SQL are RPC frameworks (see later).
- Datafusion is a query engine
Arrow Flight server in Karaf runtime
Theconfig-flight-provider
is a OSGi bundle that will expose the ConfigurationAdmin
service Configurations
as Arrow streams.
It's composed by:
ConfigDataset
represents aConfiguration
(identified by aPID
) using Arrow schema and vectors.ConfigProducer
is a Arrow Flight producer exposing eachConfiguration
as a Arrow Flight stream, using thePID
as stream path.Activator
is a OSGi bundle activator, loading allConfigDatasets
using theConfigurationAdmin
service and starting a Arrow Flight server.
Representing Apache Karaf Configurations as Arrow schema and vectors
Let's take a look onConfigDataset
.
Basically, in the
ConfigurationAdmin
Karaf service, we can get all runtime Configurations
. Each Configuration
has an unique id e.g. the PID
(persistent id). A configuration
contains a list of key/value pairs e.g. the configuration
properties
.
For example, we have
my.karaf.configuration
Configuration
PID, with foo/bar
, john/doo
properties
.
The
ConfigDataset
class represents Configuration
properties
(the associated PID
will be manage in ConfigProducer
as we will see later).
In Arrow, we can represent a list of values with a
Vector
. So, to represent Configuration
properties, we will use two vectors: a key
vector and a value
vector, both as VarCharVector
.
Then, we will describe
Configuration
properties
using a Arrow schema, with two fields: key
field is not nullable, value
field is nullable.
Let's take a look on the
ConfigDataset
code:
package net.nanthrax.karaf.arrow;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.osgi.service.cm.Configuration;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Enumeration;
public class ConfigDataset implements AutoCloseable {
private final Schema schema;
private final VectorSchemaRoot root;
private final long rows;
public ConfigDataset(Configuration configuration, BufferAllocator allocator) {
this.schema = new Schema(Arrays.asList(
new Field("key", FieldType.notNullable(new ArrowType.Utf8()), null),
new Field("value", FieldType.nullable(new ArrowType.Utf8()), null)
));
VarCharVector keyVector = new VarCharVector("key", allocator);
keyVector.allocateNew(configuration.getProperties().size());
int keyIndex = 0;
Enumeration keys = configuration.getProperties().keys();
while (keys.hasMoreElements()) {
String key = (String) keys.nextElement();
keyVector.set(keyIndex, key.getBytes(StandardCharsets.UTF_8));
keyIndex++;
}
keyVector.setValueCount(configuration.getProperties().size());
VarCharVector valueVector = new VarCharVector("value", allocator);
valueVector.allocateNew(configuration.getProperties().size());
int valueIndex = 0;
Enumeration values = configuration.getProperties().elements();
while (values.hasMoreElements()) {
String value = values.nextElement().toString();
valueVector.set(valueIndex, value.getBytes(StandardCharsets.UTF_8));
valueIndex++;
}
valueVector.setValueCount(configuration.getProperties().size());
this.root = new VectorSchemaRoot(schema, Arrays.asList(keyVector, valueVector), configuration.getProperties().size());
this.rows = configuration.getProperties().size();
}
public Schema getSchema() {
return this.schema;
}
public VectorSchemaRoot getRoot() {
return this.root;
}
public long getRows() {
return this.rows;
}
@Override
public void close() {
root.close();
}
}
We can see in the constructor that we construct the schema with the two fields (key
and value
). Then, we create the two corresponding vectors. For the memory allocation, we use the number of properties in the Configuration
(configuration.getProperties().size()
). Then, we iterate on the Configuration
properties
to populate the vectors. It's important to set the number of values in the vectors using vector.setValueCount(configuration.getProperties().size())
).
Once we have the schema and the vectors, we "assemble" them in a
VectorSchemaRoot
.
Implementing a Arrow FlightProducer
Now we haveConfigDataset
configuration representation, we can expose a Arrow Flight producer.
The
ConfigProducer
is a Arrow Flight producer: it exposes the methods used by the Arrow Flight server for binding (leveraging gRPC behind the hood). In our case, we will expose threee services:
getStream()
is aGET
method to retrieve data stream (in our case, a configuration data stream for a given PID)getFlightInfo()
is aGET
method to retrieve information about a stream (for a given PID)listFlights()
is aGET
method to get all available flight streams.
Let's take a look on the
ConfigProducer
code:
package net.nanthrax.karaf.arrow;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.NoOpFlightProducer;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.osgi.framework.BundleContext;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;
import org.osgi.service.cm.Configuration;
import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class ConfigProducer extends NoOpFlightProducer implements AutoCloseable {
private final static Logger LOGGER = LoggerFactory.getLogger(ConfigProducer.class);
private final BufferAllocator allocator;
private final Location location;
private final ConcurrentMap datasets;
public ConfigProducer(BufferAllocator allocator, Location location) {
this.allocator = allocator;
this.datasets = new ConcurrentHashMap<>();
this.location = location;
}
public void init(BundleContext bundleContext) throws IllegalStateException, IOException, InvalidSyntaxException {
ServiceReference serviceReference = bundleContext.getServiceReference(ConfigurationAdmin.class);
if (serviceReference == null) {
throw new IllegalStateException("ConfigurationAdmin service is not available");
}
ConfigurationAdmin configurationAdmin = bundleContext.getService(serviceReference);
if (configurationAdmin == null) {
throw new IllegalStateException("ConfigurationAdmin service is not available");
}
init(configurationAdmin);
}
public void init(ConfigurationAdmin configurationAdmin) throws IllegalStateException, IOException, InvalidSyntaxException {
Configuration[] configurations = configurationAdmin.listConfigurations(null);
for (Configuration configuration : configurations) {
FlightDescriptor flightDescriptor = FlightDescriptor.path(configuration.getPid());
LOGGER.info("Processing configuration {}", flightDescriptor.getPath());
ConfigDataset dataset = new ConfigDataset(configuration, allocator);
datasets.put(flightDescriptor, dataset);
}
}
@Override
public void getStream(CallContext callContext, Ticket ticket, ServerStreamListener listener) {
String pid = new String(ticket.getBytes(), StandardCharsets.UTF_8);
LOGGER.info("Get stream for PID {}", pid);
FlightDescriptor flightDescriptor = FlightDescriptor.path(pid);
ConfigDataset dataset = this.datasets.get(flightDescriptor);
if (dataset == null) {
throw CallStatus.NOT_FOUND.withDescription("PID not found").toRuntimeException();
}
listener.start(dataset.getRoot());
LOGGER.info(" Row count: {}", dataset.getRoot().getRowCount());
listener.putNext();
listener.completed();
}
@Override
public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) {
FlightEndpoint flightEndpoint = new FlightEndpoint(new Ticket(descriptor.getPath().get(0).getBytes(StandardCharsets.UTF_8)), location);
return new FlightInfo(
datasets.get(descriptor).getSchema(),
descriptor,
Collections.singletonList(flightEndpoint),
-1,
datasets.get(descriptor).getRows()
);
}
@Override
public void listFlights(CallContext context, Criteria criteria, StreamListener listener) {
datasets.forEach((k, v) -> { listener.onNext(getFlightInfo(null, k));});
listener.onCompleted();
}
public ConfigDataset getConfigDataset(String pid) {
FlightDescriptor flightDescriptor = FlightDescriptor.path(pid);
return datasets.get(flightDescriptor);
}
@Override
public void close() throws Exception {
AutoCloseables.close(datasets.values());
}
}
In the init()
method, we get the ConfigurationAdmin
Karaf service and populate datasets
Map
. This Map
contains all ConfigDatasets
with PID
as key. As we will see later, the init()
method will be called in the bundle activator.
The
getFlightInfo()
method uses the FlightDescriptor
to create a FlightEndpoint
with the PID
as path. We use this FlightEndpoint
to retrieve the corresponding ConfigDataset
from the datasets
. We use the ConfigDataset
to get the schema and the number of rows in the ConfigDataset
.
The
listFlights()
method is actually using getFlightInfo()
to put all flight infos using the provided StreamListener
.
The "main" method is
getStream()
: it get the PID from the ticket provided by the client to retrieve the corresponding ConfigDataset
from the datasets
map. If the PID key doesn't exist in the map, we throw CallStatus.NOT_FOUND.withDescription("PID not found").toRuntimeException()
. If the PID is found, we retrieve the VectorSchemaRoot
. We start the provided ServerStreamListener
to send data (listener.putNext()
) to the client and complete (close) the stream with listener.completed()
.
Starting Arrow Flight Server
Finally, we need to "connect the dots together" (injecting theConfigurationAdmin
karaf service) and start the Arrow Flight server. This is done in the Activator
.
In the
start()
method of the Activator
:
- we create the root Arrow allocator (that we pass to other component)
- we create the location where the Flight server will listen
- we create and init the
ConfigProducer
- and finally we create and start a Arrow Flight server
package net.nanthrax.karaf.arrow;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.Location;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Activator implements BundleActivator {
private final static Logger LOGGER = LoggerFactory.getLogger(Activator.class);
private BufferAllocator allocator;
private ConfigProducer configProducer;
private FlightServer flightServer;
@Override
public void start(BundleContext context) throws Exception {
allocator = new RootAllocator();
Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
configProducer = new ConfigProducer(allocator, location);
configProducer.init(context);
flightServer = FlightServer.builder(allocator, location, configProducer).build();
flightServer.start();
LOGGER.info("Starting Arrow Flight Server on port {}", flightServer.getPort());
}
@Override
public void stop(BundleContext context) throws Exception {
LOGGER.info("Stopping Arrow Flight Server");
flightServer.shutdown();
configProducer.close();
allocator.close();
}
}
The Activator
is set in the OSGi bundle headers using Bundle-Activator
statement.
Deploy in Apache Karaf runtime
For convenience, we create a Karaf features repository for deployment. Even if theconfig-flight-provider
embeds most of dependency as private package, it still has import package, meaning that other bundles have to be deployed first in the runtime. That's why a features repository will simplify the deployment:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<features name="karaf-arrow-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.3.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.3.0 http://karaf.apache.org/xmlns/features/v1.3.0">
<feature name="karaf-arrow-server" version="${project.version}">
<bundle dependency="true">mvn:javax.annotation/javax.annotation-api/1.3.2</bundle>
<bundle dependency="true">mvn:com.fasterxml.jackson.core/jackson-core/2.15.3</bundle>
<bundle dependency="true">mvn:com.fasterxml.jackson.core/jackson-annotations/2.15.3</bundle>
<bundle dependency="true">mvn:com.fasterxml.jackson.core/jackson-databind/2.15.3</bundle>
<bundle dependency="true">mvn:com.fasterxml.jackson.datatype/jackson-datatype-jsr310/2.15.3</bundle>
<bundle dependency="true">mvn:commons-codec/commons-codec/1.16.0</bundle>
<bundle>mvn:net.nanthrax.karaf.arrow/config-flight-provider/${project.version}</bundle>
</feature>
</features>
Let's use Apache Karaf 4.4.4 runtime. Extract Apache Karaf (you can download from https://www.apache.org/dyn/closer.lua/karaf/4.4.4/apache-karaf-4.4.4.tar.gz) in the folder of your choice.
When using Apache Arrow with Java 9+, we have to grant
java.nio
package permission at JVM level. In Apache Karaf, to do that, you have to edit bin/karaf
script and add:
--add-opens=java.base/java.nio=ALL-UNNAMED
in the start section.
Now, we can start Karaf runtime with:
$ bin/karaf
__ __ ____
/ //_/____ __________ _/ __/
/ ,< / __ `/ ___/ __ `/ /_
/ /| |/ /_/ / / / /_/ / __/
/_/ |_|\__,_/_/ \__,_/_/
Apache Karaf (4.4.4)
Hit '<tab>' for a list of available commands
and '[cmd] --help' for help on a specific command.
Hit '<ctrl-d>' or type 'system:shutdown' or 'logout' to shutdown Karaf.
karaf@root()>
Now, we can register our features repository and deploy our bundle via the karaf-arrow-server
feature:
karaf@root()> feature:repo-add mvn:net.nanthrax.karaf.arrow/karaf-arrow/1.0-SNAPSHOT/xml/features
Adding feature url mvn:net.nanthrax.karaf.arrow/karaf-arrow/1.0-SNAPSHOT/xml/features
karaf@root()> feature:install karaf-arrow-server
In the log (you can use log:tail
or log:display
command), you can see the Arrow Flight server started:
17:57:06.021 INFO [FelixStartLevel] Processing configuration [org.apache.karaf.command.acl.config]
17:57:06.021 INFO [FelixStartLevel] Processing configuration [profile]
17:57:06.021 INFO [FelixStartLevel] Processing configuration [jmx.acl.osgi.compendium.cm]
17:57:06.022 INFO [FelixStartLevel] Processing configuration [org.apache.karaf.command.acl.shell]
17:57:06.022 INFO [FelixStartLevel] Processing configuration [org.ops4j.pax.url.mvn]
17:57:06.023 INFO [FelixStartLevel] Processing configuration [jmx.acl.org.apache.karaf.bundle]
17:57:06.023 INFO [FelixStartLevel] Processing configuration [org.apache.karaf.command.acl.system]
17:57:06.023 INFO [FelixStartLevel] Processing configuration [org.ops4j.pax.logging]
17:57:06.024 INFO [FelixStartLevel] Processing configuration [jmx.acl]
17:57:06.024 INFO [FelixStartLevel] Processing configuration [jmx.acl.org.apache.camel]
17:57:06.024 INFO [FelixStartLevel] Processing configuration [org.apache.karaf.command.acl.kar]
17:57:06.025 INFO [FelixStartLevel] Processing configuration [org.apache.karaf.command.acl.feature]
17:57:06.104 INFO [FelixStartLevel] Starting Arrow Flight Server on port 33333
Now, we can use a Arrow Flight client to interact with the server.
Arrow Flight Client
Arrow Flight client is pretty easy to implement. In this blog, we do a simple CLI client that will interact with the Flight server located in the Karaf runtime.The client does the following:
- create a Arrow Flight client using the location of our Flight server (
localhost:33333
in our case) - we create a
FlightInfo
using the PID of the configuration we want to get - using the
FlightInfo
we get theFlightStream
from the client - during using
FlightStream
we can directly get theSchemaVectorRoot
provided by the server
java -jar
for instance). We can see on the console:
[main] INFO org.apache.arrow.memory.BaseAllocator - Debug mode disabled. Enable with the VM option -Darrow.memory.debug.allocator=true.
[main] INFO org.apache.arrow.memory.DefaultAllocationManagerOption - allocation manager type not specified, using netty as the default type
[main] INFO org.apache.arrow.memory.CheckAllocator - Using DefaultAllocationManager at memory-netty/14.0.1/arrow-memory-netty-14.0.1.jar!/org/apache/arrow/memory/DefaultAllocationManagerFactory.class
Client connected to grpc+tcp://localhost:33333
Flight info: FlightInfo{schema=Schema, descriptor=org.apache.karaf.shell, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@3369b90d, expirationTime=(none)}], bytes=-1, records=12, ordered=false}
Row count: 12
key value
completionMode GLOBAL
disableEofExit false
disableLogout false
felix.fileinstall.filename file:/Users/jbonofre/Downloads/apache-karaf-4.4.4/etc/org.apache.karaf.shell.cfg
hostKey /Users/jbonofre/Downloads/apache-karaf-4.4.4/etc/host.key
service.pid org.apache.karaf.shell
sftpEnabled true
sshHost 0.0.0.0
sshIdleTimeout 1800000
sshPort 8101
sshRealm karaf
sshRole ssh
We retrieve all properties from the org.apache.karaf.shell
configuration remotely using Arrow Flight !
This blog post is the first one of a series about Apache Arrow, Arrow Flight, Arrow Flight SQL, etc.
Enjoy !
Comments
Post a Comment