HBase Phoenix UDFs的实现
HBase里的Phoenix的UDFs的实现,HBase的版本是0.98,Phoenix的也需要选择对应的版本。
参考文章:
http://phoenix.apache.org/udf.html
http://phoenix-hbase.blogspot.com/2013/04/how-to-add-your-own-built-in-function.html(翻墙才能打开,而且这篇文章很旧,2013年的)
官网说Phoenix 4.4.0版本才实现了让用户拥有使用自定义函数的功能。话说以前的3.0版本我们都是把自定义函数写到系统函数里的,这个需要编译Phoenix的源码,对后期版本的升级很不友好。因此4.4.0版本可以说有他的积极意义的O(∩_∩)O哈哈~
直接上代码吧,UrlParseFunction.java类
package phoenix.function; import java.sql.SQLException; import java.util.List; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.function.ScalarFunction; import org.apache.phoenix.parse.FunctionParseNode.Argument; import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.schema.types.PVarchar; import phoenix.util.DiscoverUrlUtil; import phoenix.util.DomainInfo; @BuiltInFunction(name=UrlParseFunction.NAME, args={ @Argument(allowedTypes={PVarchar.class}), @Argument(allowedTypes={PInteger.class})} ) public class UrlParseFunction extends ScalarFunction { public static final String NAME = "URLPARSE"; private static final int TYPE1 = 1; private static final int TYPE2 = 2; private static final int TYPE3 = 3; public UrlParseFunction() { } public UrlParseFunction(List children) throws SQLException { super(children); } @Override public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { Expression strExpression = getStrExpression(); if (!strExpression.evaluate(tuple, ptr)) { return false; } String sourceStr = (String)PVarchar.INSTANCE.toObject(ptr, strExpression.getSortOrder()); if (sourceStr == null) { return true; } Expression typeExpression = getTypeExpression(); if (!typeExpression.evaluate(tuple, ptr)) { return false; } int panelType = (Integer)PInteger.INSTANCE.toObject(ptr, typeExpression.getSortOrder()); if (panelType == TYPE1) { String topDomain = DomainInfo.getDomainGroup(sourceStr); ptr.set(PVarchar.INSTANCE.toBytes(topDomain != null ? topDomain : "other")); } else if (panelType == TYPE2) { String sClassificationName = DiscoverUrlUtil.getSClassificationName(sourceStr); ptr.set(PVarchar.INSTANCE.toBytes(sClassificationName)); } else if (panelType == TYPE3) { String urlName = DiscoverUrlUtil.getUrlName(sourceStr); ptr.set(PVarchar.INSTANCE.toBytes(urlName)); } else { throw new IllegalStateException("parse type should be in (1,2,3)."); } return true; } @Override public PDataType getDataType() { return getStrExpression().getDataType(); } @Override public boolean isNullable() { return getStrExpression().isNullable(); } @Override public String getName() { return NAME; } private Expression getStrExpression() { return children.get(0); } private Expression getTypeExpression() { return children.get(1); } }
其中,DomainInfo.java和DiscoverUrlUtil.java是两个工具类,获取url对应的中文名称。
下面的两段代码是要求必须要重写的:
@Override public PDataType getDataType() { return getStrExpression().getDataType(); } @Override public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { Expression strExpression = getStrExpression(); if (!strExpression.evaluate(tuple, ptr)) { return false; } ...... return true; }
pom.xml代码:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.youku</groupId> <artifactId>phoenix</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <dependencies> <dependency> <groupId>sqlline</groupId> <artifactId>sqlline</artifactId> <version>1.1.9</version> </dependency> <dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-core</artifactId> <version>4.4.0-HBase-0.98</version> <exclusions> <exclusion> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> </exclusion> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> </exclusion> </exclusions> </dependency> </dependencies> </project>
以上代码需要打包成jar包,上传到HBase集群的HDFS上,并在进入Phoenix的shell以后,输入下边的代码,将函数注册到Phoenix里。
CREATE FUNCTION URLPARSE(varchar,integer) returns varchar as 'phoenix.function.UrlParseFunction' using jar 'hdfs://0.0.0.0:8010/hbase/udf/phoenix-1.0-SNAPSHOT.jar';
hadoop fs -rm /hbase/udf/phoenix-1.0-SNAPSHOT.jar hadoop fs -put /root/test/phoenix-1.0-SNAPSHOT.jar /hbase/udf/
PS:hbase-site.xml也需要做修改,需要将修改后的文件上传到Phoenix的客户端里,只需要修改客户端即可。
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- /** * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ --> <configuration> <property> <name>phoenix.functions.allowUserDefinedFunctions</name> <value>true</value> </property> <property> <name>fs.hdfs.impl</name> <value>org.apache.hadoop.hdfs.DistributedFileSystem</value> </property> <property> <name>hbase.dynamic.jars.dir</name> <value>${hbase.rootdir}/lib</value> <description> The directory from which the custom udf jars can be loaded dynamically by the phoenix client/region server without the need to restart. However, an already loaded udf class would not be un-loaded. See HBASE-1936 for more details. </description> </property> <property> <name>hbase.master</name> <value>0.0.0.0:6000</value> </property> <property> <name>hbase.master.maxclockskew</name> <value>180000</value> </property> <property> <name>hbase.rootdir</name> <value>hdfs://0.0.0.0:8010/hbase</value> </property> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>hbase.tmp.dir</name> <value>/opt/hbase/tmp</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>localhost</value> </property> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/opt/storage/zookeeper</value> <description>Property from ZooKeeper's config zoo.cfg. The directory where the snapshot is stored. </description> </property> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration>
最后的效果:
0: jdbc:phoenix:localhost:2181> select urlparse(cat4,3) from yk.video_channel_source where videoid = 100059807; +------------------------------------------+ | URLPARSE(CAT4, 3) | +------------------------------------------+ | www.baidu.com | +------------------------------------------+ 1 row selected (0.081 seconds) 0: jdbc:phoenix:localhost:2181> select urlparse(cat4,2) from yk.video_channel_source where videoid = 100059807; +------------------------------------------+ | URLPARSE(CAT4, 2) | +------------------------------------------+ | 其他 | +------------------------------------------+ 1 row selected (0.104 seconds) 0: jdbc:phoenix:localhost:2181> select urlparse(cat4,1) from yk.video_channel_source where videoid = 100059807; +------------------------------------------+ | URLPARSE(CAT4, 1) | +------------------------------------------+ | *.baidu.com | +------------------------------------------+ 1 row selected (0.086 seconds)