关键字:java、命令行、参数解析、commandlineparser
时间:2017年8月

Maven

<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>

代码

CommandLineParser parser = new DefaultParser();
Options options = new Options();

options.addOption("h", "help", false, "Print usages.");
// h,短参数名,-h
// help,长参数名,--help
// false,不需要参数值
// Print usages, 参数描述

options.addOption("l", "listen", true, "Address to be listen. Example: 192.168.1.100:8888");
// true,需要参数值

options.addOption("", "zookeeper", true, "zookeeper address to be connected.");
// 只能用长参数

CommandLine cl;
try {
	cl = parser.parse(options, args);
        if (cl.hasOption('h')) {
        	System.out.println("h");;
        }
        if (cl.hasOption("listen")) {
        	System.out.println(cl.getOptionValue("listen"));
        }
} catch (ParseException e) {
	e.printStackTrace();
}

关键字:jstorm、helloworld、hello
时间:2017年3月

目录

前言
jar包
主要代码
集群上运行

前言

1、spout的关键代码是nextTuple(),一个任务开始后,集群会不停的调用nextTuple()。nextTuple()应当有collector.emit(),emit()会让下游的bolt接收到tuple。
2、bolt的关键代码是execute(Tuple input),参数input来自上游调用collector.emit()。bolt也可以调用collector.emit()传一个tuple给下游bolt。
3、本程序构建了一个spout-001 -> bolt-001 -> bolt-002拓扑。

jar包

1、jstorm程序需要使用jar包jstorm-core,目前版本为2.1.1;
2、jstorm-core依赖于slf4j-api,但只兼容slf4j-api的1.5.5或以下版本。

<dependency>
    <groupId>com.alibaba.jstorm</groupId>
    <artifactId>jstorm-core</artifactId>
    <version>2.1.1</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.5.5</version>
    <scope>provided</scope>
</dependency>

主要代码

/*
 * App.java
 */
public class App 
{
    public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException
    {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout-001", new FirstSpout());
        builder.setBolt("bolt-001", new FirstBolt()).shuffleGrouping("spout-001");
        builder.setBolt("bolt-002", new SecondBolt()).shuffleGrouping("bolt-001");
        Config config = new Config();
        config.setDebug(false);
        config.setNumWorkers(5);
        config.setMaxTaskParallelism(3);

        /* 本地环境调试 */
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("hello", config, builder.createTopology());

        /* JStorm集群环境运行 */
        //StormSubmitter.submitTopology("hello", config, builder.createTopology());
        }
    }
}
/*
 * FirstSpout.java
 */
public class FirstSpout extends BaseRichSpout {

	private static final long serialVersionUID = 2097335660808026799L;
	private SpoutOutputCollector collector;
	
        @Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.collector = collector;
	}

        @Override
	public void nextTuple() {
		collector.emit(new Values("hello"));
		try {
			Thread.sleep(500);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

        @Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("field1"));
	}
}
/*
 * FirstBolt.java
 */
public class FirstBolt extends BaseRichBolt {

	private static final long serialVersionUID = -7866785982451843573L;
	private OutputCollector collector;
	
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void execute(Tuple input) {
		String s = (String)input.getValueByField("field1");
		System.out.printf("bolt-001 received '%s' from spout-001.\n", s);
                s += " world";
		collector.emit(new Values(s));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("field2"));
	}
}
/*
 *  SecondBolt.java
 */
public class SecondBolt extends BaseRichBolt {

	private static final long serialVersionUID = 8910534863307440929L;

	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

	}

	@Override
	public void execute(Tuple input) {
            String s = (String)input.getValueByField("field1");
            System.out.printf("bolt-002 received '%s' from bolt-001\n", s); 
        }

        @Override public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
}

集群上运行

1、代码中必须要执行StormSubmitter.submitTopology();
2、生成一个jar包,上传至nimbus节点;
3、nimbus上执行shell执行jstorm jar hello.jar com.xxxx.hello.App。
注:com.xxxx.hello.App为main()所在的类。

关键字:jedis、java、redis、jedis-2.9.0
时间:2017年3月

目录

1、要点
2、版本
3、代码

要点

a)pileline性能远高于非pipeline方式
b)cluster不支持pipeline

版本

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

代码

连接

Jedis jedis = new Jedis("172.19.1.105", 6379);
jedis.set("key", "value");
System.out.println(jedis.get("key"));
jedis.close();

连接池

JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(5);
config.setMaxIdle(3);
config.setMaxWaitMillis(1000);
JedisPool pool = new JedisPool(config, "172.19.1.105", 6379);
Jedis jedis = pool.getResource();
jedis.set("key", "value");
System.out.println(jedis.get("key"));
pool.close();

cluster连接

Set nodes = new LinkedHashSet();
nodes.add(new HostAndPort("172.19.1.101", 6379));
nodes.add(new HostAndPort("172.19.1.102", 6379));
nodes.add(new HostAndPort("172.19.1.103", 6379));
nodes.add(new HostAndPort("172.19.1.104", 6379));
nodes.add(new HostAndPort("172.19.1.105", 6379));
nodes.add(new HostAndPort("172.19.1.106", 6379));
        
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(5);
config.setMaxIdle(3);
config.setMaxWaitMillis(1000);
        
JedisCluster cluster = new JedisCluster(nodes, config);
cluster.set("key001", "value00001");
try {
	cluster.close();
} catch (IOException e) {
	e.printStackTrace();
}

Pipeline

Jedis jedis = new Jedis("172.19.1.105", 6379);
Pipeline pl = jedis.pipelined();
pl.set(k1,v1);
pl.set(k2,v2);
pl.set(k3,v3);
pl.sync();
try {
    pl.close();
} catch (IOException e) {
    e.printStackTrace();
}
jedis.close();

关键词:hbase、java、1.2.4、客户端
时间:2017年3月

目录

重要概念
数据准备
代码及注释

重要概念

列存储

hbase采用的是列存储方式,主要有table、row、column、value、timestamp五个概念:
a)table是同类数据的集合;
b)通过row可以快速定位到一条数据;
c)一条数据包含多个column;
d)每个column可以包含多个KeyValue;
e)每个column对应有一个或多个value;
f)每个value对应一个timestamp。

row column column
key timestamp value key timestamp value
000001 article:title 1488356445671 失控
article:tag 1488356965236 蜂群,分布式,哲学
article:content 1488356991430
author:name 1488356945690 凯文·凯利
author:nickname 1488356945698 KK
000002

Java类

hbase的java客户端jar包hbase-client-1.2.4.jar有HBaseConfiguration、ConnectionFactory、Connection、Table、Get、Put、Delete、Scan、ResultScanner、Result、Cell等重要对象:
a)ConnectionFactory使用HBaseConfiguration创建Connection;
b)通过Connection获取Table;
c)Table可以进行Get、Put、Delete等动作并得到Result;
d)Table也能通过Scan获取ResultScanner;
e)ResultScanner能获取Result;
f)Result中有多个Cell;
g)Cell有row、family、qualifier、timestamp、value等属性。

数据准备

hadoop@master:~/hbase-1.2.4$ bin/hbase shell
hbase(main):001:0> put 't1','r20170228','f1:q1','123456'
hbase(main):002:0> put 't1','r20170228','f1:q2','abcdef'
hbase(main):003:0> put 't1','r20170301','f1:q1','1234567890'
hbase(main):004:0> put 't1','r20170301','f1:q2','abcdefgh'
hbase(main):003:0> put 't1','r20170302','f1:q1','1111'
hbase(main):004:0> put 't1','r20170302','f1:q2','aaaa'
hbase(main):005:0> scan 't1'
...

代码及注释

在查阅他人文章时发现很多人使用HTable、HTablePool、HColumnDescriptor等类的代码,属于过时或非官方推荐的方式。在客户端1.2.4版本中HTable属于@InterfaceAudience.Private、HTablePool已经@Deprecated、HColumnDescriptor已经找不到了。

package com.fish2bird.hbaseclientdemo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.PrefixFilter;

public class App {

public static void main(String[] args) throws IOException {
	Configuration conf = HBaseConfiguration.create();
	conf.set("hbase.zookeeper.quorum", "zookeeper1,zookeeper2,zookeeper3");
	// 只需要配置zookeeper地址,其他参数会从zookeeper中自动获取。

	Connection connection = ConnectionFactory.createConnection(conf);
	Table table = connection.getTable(TableName.valueOf("t1"));

	// 读取row为"r20170301"的单条数据
	Get get = new Get("r20170301".getBytes());
	Result result1 = table.get(get);
	if (!result1.isEmpty()) {
		for (Cell cell : result1.rawCells()) {
			System.out.printf("%s,%s,%s,%d,%s\n",
				new String(cell.getRow()),
				new String(cell.getFamily()),
				new String(cell.getQualifier()),
				cell.getTimestamp(),
				new String(cell.getValue()));
		}
	}

	// 读取row为"r2017*******"的所有数据
	PrefixFilter filter = new PrefixFilter("r2017".getBytes());
	Scan s = new Scan("r2017".getBytes(), filter);
	ResultScanner scanner = table.getScanner(s);
	for (Result result2 : scanner) {
		for (Cell cell : result2.rawCells()) {
			System.out.printf("%s,%s,%s,%d,%s\n",
				new String(cell.getRow()),new String(cell.getFamily()),
				new String(cell.getQualifier()),
				cell.getTimestamp(),
				new String(cell.getValue()));
		}
	}

	// 写一条数据
	Put put = new Put("r10000".getBytes());
	put.addColumn("f1".getBytes(),"q1".getBytes(),"a1b2c3".getBytes());
	put.addColumn("f1".getBytes(),"q2".getBytes(),"a1b2c3a1b2c3".getBytes());
	table.put(put);
}

}

注:虽然getRow()、getFamily()、getQualifier()、getValue()等方法已被标记为@Deprecated,但新方法getRowArray()、getFamilyArray()、getQualifierArray()、getValueArray()返回值全部相同无法正常使用。