HBase里的Phoenix的UDFs的实现,HBase的版本是0.98,Phoenix的也需要选择对应的版本。
参考文章:
http://phoenix.apache.org/udf.html
http://phoenix-hbase.blogspot.com/2013/04/how-to-add-your-own-built-in-function.html(翻墙才能打开,而且这篇文章很旧,2013年的)
官网说Phoenix 4.4.0版本才实现了让用户拥有使用自定义函数的功能。话说以前的3.0版本我们都是把自定义函数写到系统函数里的,这个需要编译Phoenix的源码,对后期版本的升级很不友好。因此4.4.0版本可以说有他的积极意义的O(∩_∩)O哈哈~
直接上代码吧,UrlParseFunction.java类
package phoenix.function;
import java.sql.SQLException;
import java.util.List;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.function.ScalarFunction;
import org.apache.phoenix.parse.FunctionParseNode.Argument;
import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PVarchar;
import phoenix.util.DiscoverUrlUtil;
import phoenix.util.DomainInfo;
@BuiltInFunction(name=UrlParseFunction.NAME, args={
@Argument(allowedTypes={PVarchar.class}),
@Argument(allowedTypes={PInteger.class})} )
public class UrlParseFunction extends ScalarFunction {
public static final String NAME = "URLPARSE";
private static final int TYPE1 = 1;
private static final int TYPE2 = 2;
private static final int TYPE3 = 3;
public UrlParseFunction() {
}
public UrlParseFunction(List children) throws SQLException {
super(children);
}
@Override
public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
Expression strExpression = getStrExpression();
if (!strExpression.evaluate(tuple, ptr)) {
return false;
}
String sourceStr = (String)PVarchar.INSTANCE.toObject(ptr, strExpression.getSortOrder());
if (sourceStr == null) {
return true;
}
Expression typeExpression = getTypeExpression();
if (!typeExpression.evaluate(tuple, ptr)) {
return false;
}
int panelType = (Integer)PInteger.INSTANCE.toObject(ptr, typeExpression.getSortOrder());
if (panelType == TYPE1) {
String topDomain = DomainInfo.getDomainGroup(sourceStr);
ptr.set(PVarchar.INSTANCE.toBytes(topDomain != null ? topDomain : "other"));
} else if (panelType == TYPE2) {
String sClassificationName = DiscoverUrlUtil.getSClassificationName(sourceStr);
ptr.set(PVarchar.INSTANCE.toBytes(sClassificationName));
} else if (panelType == TYPE3) {
String urlName = DiscoverUrlUtil.getUrlName(sourceStr);
ptr.set(PVarchar.INSTANCE.toBytes(urlName));
} else {
throw new IllegalStateException("parse type should be in (1,2,3).");
}
return true;
}
@Override
public PDataType getDataType() {
return getStrExpression().getDataType();
}
@Override
public boolean isNullable() {
return getStrExpression().isNullable();
}
@Override
public String getName() {
return NAME;
}
private Expression getStrExpression() {
return children.get(0);
}
private Expression getTypeExpression() {
return children.get(1);
}
}
其中,DomainInfo.java和DiscoverUrlUtil.java是两个工具类,获取url对应的中文名称。
下面的两段代码是要求必须要重写的:
@Override
public PDataType getDataType() {
return getStrExpression().getDataType();
}
@Override
public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
Expression strExpression = getStrExpression();
if (!strExpression.evaluate(tuple, ptr)) {
return false;
}
......
return true;
}
pom.xml代码:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.youku</groupId>
<artifactId>phoenix</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>sqlline</groupId>
<artifactId>sqlline</artifactId>
<version>1.1.9</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.4.0-HBase-0.98</version>
<exclusions>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
以上代码需要打包成jar包,上传到HBase集群的HDFS上,并在进入Phoenix的shell以后,输入下边的代码,将函数注册到Phoenix里。
CREATE FUNCTION URLPARSE(varchar,integer) returns varchar as 'phoenix.function.UrlParseFunction' using jar 'hdfs://0.0.0.0:8010/hbase/udf/phoenix-1.0-SNAPSHOT.jar';
完整的代码结构:
hadoop jar包移除和上传命令:
hadoop fs -rm /hbase/udf/phoenix-1.0-SNAPSHOT.jar
hadoop fs -put /root/test/phoenix-1.0-SNAPSHOT.jar /hbase/udf/
PS:hbase-site.xml也需要做修改,需要将修改后的文件上传到Phoenix的客户端里,只需要修改客户端即可。
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<configuration>
<property>
<name>phoenix.functions.allowUserDefinedFunctions</name>
<value>true</value>
</property>
<property>
<name>fs.hdfs.impl</name>
<value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
</property>
<property>
<name>hbase.dynamic.jars.dir</name>
<value>${hbase.rootdir}/lib</value>
<description>
The directory from which the custom udf jars can be loaded
dynamically by the phoenix client/region server without the need to restart. However,
an already loaded udf class would not be un-loaded. See
HBASE-1936 for more details.
</description>
</property>
<property>
<name>hbase.master</name>
<value>0.0.0.0:6000</value>
</property>
<property>
<name>hbase.master.maxclockskew</name>
<value>180000</value>
</property>
<property>
<name>hbase.rootdir</name>
<value>hdfs://0.0.0.0:8010/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.tmp.dir</name>
<value>/opt/hbase/tmp</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>localhost</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/opt/storage/zookeeper</value>
<description>Property from ZooKeeper's config zoo.cfg.
The directory where the snapshot is stored.
</description>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
最后的效果:
0: jdbc:phoenix:localhost:2181> select urlparse(cat4,3) from yk.video_channel_source where videoid = 100059807;
+------------------------------------------+
| URLPARSE(CAT4, 3) |
+------------------------------------------+
| www.baidu.com |
+------------------------------------------+
1 row selected (0.081 seconds)
0: jdbc:phoenix:localhost:2181> select urlparse(cat4,2) from yk.video_channel_source where videoid = 100059807;
+------------------------------------------+
| URLPARSE(CAT4, 2) |
+------------------------------------------+
| 其他 |
+------------------------------------------+
1 row selected (0.104 seconds)
0: jdbc:phoenix:localhost:2181> select urlparse(cat4,1) from yk.video_channel_source where videoid = 100059807;
+------------------------------------------+
| URLPARSE(CAT4, 1) |
+------------------------------------------+
| *.baidu.com |
+------------------------------------------+
1 row selected (0.086 seconds)