Skip to content

Flume 从 RabbitMQ 收集数据保存到 Hadoop(Windows 环境)

🏷️ Flume RabbitMQ Hadoop

之前公司使用 ELK 保存日志,将日志发送到 RabbitMQ,然后再用 Logstash 收集日志到 ElasticSearch。

这里使用 Flume 将日志保存到 Hadoop,日志源正好可以重用之前 RabbitMQ 中的日志,只需要在 Exchange 中新增一个 binding 就行了。

1. 下载 Flume 和 安装 rabbitmq-flume-plugin 插件

下载 apache-flume-1.8.0-bin.tar.gz 后解压到 D:\flume\flume-1.8.0 目录。

从 RabbitMQ 读取日志需要单独安装 rabbitmq-flume-plugin 插件。
最新版下载地址:rabbitmq-flume-plugin-standalone-1.0.3.jar
下载后复制到 D:\flume\flume-1.8.0\lib 目录即可。

2. 从 RabbitMQ 读取日志保存到文件

创建配置文件 D:\flume\flume-1.8.0\conf\rabbitmq-flume-logger.properties

properties
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# The channel can be defined as follows.
a1.sources.r1.channels = c1

a1.sources.r1.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
a1.sources.r1.host = 192.168.0.1
a1.sources.r1.port = 5672
a1.sources.r1.virtual-host = /
a1.sources.r1.username = username
a1.sources.r1.password = password
a1.sources.r1.queue = queue_log
a1.sources.r1.prefetchCount = 10

# Each sink's type must be defined
a1.sinks.k1.type = logger

# Specify the channel the sink should use
a1.sinks.k1.channel = c1

# Each channel's type is defined.
a1.channels.c1.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
a1.channels.c1.capacity = 100

启动 flume agent

bash
D:\flume\flume-1.8.0>bin\flume-ng agent -n a1 -c conf -f conf/rabbitmq-flume-logger.properties

不知道是不是哪里的配置问题,日志内容只有最前面的 16 个字符被保存了下来。因为只是为了测试从 RabbitMQ 的读取,所以也就没细查。

bash
03 九月 2019 14:48:10,241 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.LoggerSink.process:95)  - Event: { headers:{exchange=logstash, routing-key=service.netcore} body: 7B 0A 20 20 22 6C 65 76 65 6C 22 3A 20 22 45 72 {.  "level": "Er }

3. 从 Rabbit MQ 读取日志保存到 Hadoop

如何在 Windows 上安装 Hadoop 见 这里

创建配置文件 D:\flume\flume-1.8.0\conf\rabbitmq-flume-hadoop.properties

properties
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.channels = c1
a1.sources.r1.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
a1.sources.r1.host = 192.168.0.1
a1.sources.r1.port = 5672
a1.sources.r1.virtual-host = /
a1.sources.r1.username = username
a1.sources.r1.password = password
a1.sources.r1.queue = queue_log
a1.sources.r1.prefetchCount = 10

a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/liujiajia/flume/%y-%m-%d/%H/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollInterval = 60
a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.channels.c1.type = memory
a1.channels.c1.capacity = 100

启动 flume agent

bash
D:\flume\flume-1.8.0>bin\flume-ng agent -n a1 -c conf -f conf/rabbitmq-flume-hadoop.properties

4. 导出到 RabbitMQ

创建配置文件 D:\flume\flume-1.8.0\conf\rabbitmq-flume-rabbitmq.properties

properties
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.channels = c1
a1.sources.r1.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
a1.sources.r1.host = 192.168.0.1
a1.sources.r1.port = 5672
a1.sources.r1.virtual-host = /
a1.sources.r1.username = username
a1.sources.r1.password = password
a1.sources.r1.queue = queue_log
a1.sources.r1.prefetchCount = 10

a1.sinks.k1.channel = c1
a1.sinks.k1.type = com.aweber.flume.sink.rabbitmq.RabbitMQSink
a1.sinks.k1.host = 192.168.0.1
a1.sinks.k1.port = 5672
a1.sinks.k1.virtual-host = /
a1.sinks.k1.username = username
a1.sinks.k1.password = password
a1.sinks.k1.exchange = flume.sink
a1.sinks.k1.routing-key = service.flume
a1.sinks.k1.publisher-confirms = true

a1.channels.c1.type = memory
a1.channels.c1.capacity = 100

启动 flume agent

bash
D:\flume\flume-1.8.0>bin\flume-ng agent -n a1 -c conf -f conf/rabbitmq-flume-rabbitmq.properties

附 1. Could not configure sink k1 due to: No channel configured for sink: k1 错误

配置导出到 RabbitMQ 时发生了这个错误。具体日志如下:

txt
Could not configure sink  k1 due to: No channel configured for sink: k1
org.apache.flume.conf.ConfigurationException: No channel configured for sink: k1
	at org.apache.flume.conf.sink.SinkConfiguration.configure(SinkConfiguration.java:52)
	at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:680)
	at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:347)
	at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:212)
	at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:126)
	at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:108)
	at org.apache.flume.node.PropertiesFileConfigurationProvider.getFlumeConfiguration(PropertiesFileConfigurationProvider.java:194)
	at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:93)
	at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

明明配置了 sink 的 channel 为什么会报错呢?搜了一下还真有遇到同样问题的人
原因竟然是 a1.sinks.k1.channel 配置项中的 channel 是不加 s 的。
细追源头才发现 rabbitmq-flume-plugin 插件的文档示例上写的就是错的。
=.=|||

附 2. flume-ng agent 启动时显示 Class path contains multiple SLF4J bindings. 警告

txt
Including Hadoop libraries found in (D:\hadoop-3.2.0) for DFS access
WARN: HBASE_HOME not found
WARN: HIVE_HOME not found

  Running FLUME agent :
    class: org.apache.flume.node.Application
    arguments: -n $agent_name -f "D:\flume\flume-1.8.0\conf\rabbitmq-flumn-haddop.properties"

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/flume/flume-1.8.0/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/hadoop-3.2.0/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

这个是由于 jar 包重复导致的。倒也不影响使用,若要去掉警告,删除 flume 目录下的 slf4j-log4j12-1.6.1.jar 文件就可以了。

附 3. 发送到 Hadoop 时报错: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null

错误消息内容如下:

txt
process failed
java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
	at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
	at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:251)
	at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:460)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:368)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:745)
Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:451)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
	at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
	at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:251)
	at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:460)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:368)
	... 3 more
Error writing to channel for com.aweber.flume.source.rabbitmq.Consumer@5d8a78d8, message rejected org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight

HDFSEventSink.javaprocess 方法中获取路径名(String realPath = BucketPath.escapeString)时报错了。

java
/**
* Pull events out of channel and send it to HDFS. Take at most batchSize
* events per Transaction. Find the corresponding bucket for the event.
* Ensure the file is open. Serialize the data and write it to the file on
* HDFS. <br/>
* This method is not thread safe.
*/
public Status process() throws EventDeliveryException {
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    transaction.begin();
    try {
        Set<BucketWriter> writers = new LinkedHashSet<>();
        int txnEventCount = 0;
        for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
            Event event = channel.take();
            if (event == null) {
            }

            // reconstruct the path name by substituting place holders
            String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
                timeZone, needRounding, roundUnit, roundValue, useLocalTime);
            String realName = BucketPath.escapeString(fileName, event.getHeaders(),
                timeZone, needRounding, roundUnit, roundValue, useLocalTime);

            String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
            BucketWriter bucketWriter;
            HDFSWriter hdfsWriter = null;
            // Callback to remove the reference to the bucket writer from the
            // sfWriters map so that all buffers used by the HDFS file
            // handles are garbage collected.
            WriterCallback closeCallback = new WriterCallback() {
                @Override
                public void run(String bucketPath) {
                    LOG.info("Writer callback called.");
                    synchronized (sfWritersLock) {
                        sfWriters.remove(bucketPath);
                    }
                }
            };
            synchronized (sfWritersLock) {
                bucketWriter = sfWriters.get(lookupPath);
                // we haven't seen this file yet, so open it and cache the handle
                if (bucketWriter == null) {
                    hdfsWriter = writerFactory.getWriter(fileType);
                    bucketWriter = initializeBucketWriter(realPath, realName,
                        lookupPath, hdfsWriter, closeCallback);
                    sfWriters.put(lookupPath, bucketWriter);
                }
            }

            // Write the data to HDFS
            try {
                bucketWriter.append(event);
            } catch (BucketClosedException ex) {
                LOG.info("Bucket was closed while trying to append, " +
                        "reinitializing bucket and writing event.");
                hdfsWriter = writerFactory.getWriter(fileType);
                bucketWriter = initializeBucketWriter(realPath, realName,
                lookupPath, hdfsWriter, closeCallback);
                synchronized (sfWritersLock) {
                    sfWriters.put(lookupPath, bucketWriter);
                }
                bucketWriter.append(event);
            }

            // track the buckets getting written in this transaction
            if (!writers.contains(bucketWriter)) {
                writers.add(bucketWriter);
            }
        }

        if (txnEventCount == 0) {
            sinkCounter.incrementBatchEmptyCount();
        } else if (txnEventCount == batchSize) {
            sinkCounter.incrementBatchCompleteCount();
        } else {
            sinkCounter.incrementBatchUnderflowCount();
        }

        // flush all pending buckets before committing the transaction
        for (BucketWriter bucketWriter : writers) {
            bucketWriter.flush();
        }

        transaction.commit();

        if (txnEventCount < 1) {
            return Status.BACKOFF;
        } else {
            sinkCounter.addToEventDrainSuccessCount(txnEventCount);
            return Status.READY;
        }
    } catch (IOException eIO) {
        transaction.rollback();
        LOG.warn("HDFS IO error", eIO);
        return Status.BACKOFF;
    } catch (Throwable th) {
        transaction.rollback();
        LOG.error("process failed", th);
        if (th instanceof Error) {
            throw (Error) th;
        } else {
            throw new EventDeliveryException(th);
        }
    } finally {
        transaction.close();
    }
}

具体是在 BucketPath.javareplaceShorthand 方法中报出的错误。
从代码中可以看出当不使用本地系统时间时,会使用 MQ 消息中 timestamp Header 的值,然后会验证该值是否为 nullPreconditions.checkNotNull),若为 null 则会抛出异常。
感觉这里像是个 bug,因为文档上说当 timestamp Header 不存在时默认会使用系统时间。

java
/**
* Not intended as a public API
*/
@VisibleForTesting
protected static String replaceShorthand(char c, Map<String, String> headers,
    TimeZone timeZone, boolean needRounding, int unit, int roundDown,
    boolean useLocalTimestamp, long ts) {

    String timestampHeader = null;
    try {
        if (!useLocalTimestamp) {
            timestampHeader = headers.get("timestamp");
            Preconditions.checkNotNull(timestampHeader, "Expected timestamp in " +
                "the Flume event headers, but it was null");
            ts = Long.valueOf(timestampHeader);
        } else {
            timestampHeader = String.valueOf(ts);
        }
    } catch (NumberFormatException e) {
        throw new RuntimeException("Flume wasn't able to parse timestamp header"
        + " in the event to resolve time based bucketing. Please check that"
        + " you're correctly populating timestamp header (for example using"
        + " TimestampInterceptor source interceptor).", e);
    }

    if (needRounding) {
        ts = roundDown(roundDown, unit, ts, timeZone);
    }

    // It's a date
    String formatString = "";
    switch (c) {
        case '%':
            return "%";
        case 'a':
            formatString = "EEE";
            break;
        case 'A':
            formatString = "EEEE";
            break;
        case 'b':
            formatString = "MMM";
            break;
        case 'B':
            formatString = "MMMM";
            break;
        case 'c':
            formatString = "EEE MMM d HH:mm:ss yyyy";
            break;
        case 'd':
            formatString = "dd";
            break;
        case 'e':
            formatString = "d";
            break;
        case 'D':
            formatString = "MM/dd/yy";
            break;
        case 'H':
            formatString = "HH";
            break;
        case 'I':
            formatString = "hh";
            break;
        case 'j':
            formatString = "DDD";
            break;
        case 'k':
            formatString = "H";
            break;
        case 'l':
            formatString = "h";
            break;
        case 'm':
            formatString = "MM";
            break;
        case 'M':
            formatString = "mm";
            break;
        case 'n':
            formatString = "M";
            break;
        case 'p':
            formatString = "a";
            break;
        case 's':
            return "" + (ts / 1000);
        case 'S':
            formatString = "ss";
            break;
        case 't':
            // This is different from unix date (which would insert a tab character
            // here)
            return timestampHeader;
        case 'y':
            formatString = "yy";
            break;
        case 'Y':
            formatString = "yyyy";
            break;
        case 'z':
            formatString = "ZZZ";
            break;
        default:
            // LOG.warn("Unrecognized escape in event format string: %" + c);
            return "";
    }

    SimpleDateFormat format = getSimpleDateFormat(formatString);
    if (timeZone != null) {
        format.setTimeZone(timeZone);
    } else {
        format.setTimeZone(TimeZone.getDefault());
    }

    Date date = new Date(ts);
    return format.format(date);
}

HDFSEventSink.javaconfigure 方法中对该参数赋的值,其值来自 hdfs.useLocalTimeStamp 配置项。
将该配置项设置为 true 即可避免该错误。

java
// read configuration and setup thresholds
@Override
public void configure(Context context) {
    this.context = context;

    filePath = Preconditions.checkNotNull(
        context.getString("hdfs.path"), "hdfs.path is required");
    fileName = context.getString("hdfs.filePrefix", defaultFileName);
    this.suffix = context.getString("hdfs.fileSuffix", defaultSuffix);
    inUsePrefix = context.getString("hdfs.inUsePrefix", defaultInUsePrefix);
    inUseSuffix = context.getString("hdfs.inUseSuffix", defaultInUseSuffix);
    String tzName = context.getString("hdfs.timeZone");
    timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName);
    rollInterval = context.getLong("hdfs.rollInterval", defaultRollInterval);
    rollSize = context.getLong("hdfs.rollSize", defaultRollSize);
    rollCount = context.getLong("hdfs.rollCount", defaultRollCount);
    batchSize = context.getLong("hdfs.batchSize", defaultBatchSize);
    idleTimeout = context.getInteger("hdfs.idleTimeout", 0);
    String codecName = context.getString("hdfs.codeC");
    fileType = context.getString("hdfs.fileType", defaultFileType);
    maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles);
    callTimeout = context.getLong("hdfs.callTimeout", defaultCallTimeout);
    threadsPoolSize = context.getInteger("hdfs.threadsPoolSize",
        defaultThreadPoolSize);
    rollTimerPoolSize = context.getInteger("hdfs.rollTimerPoolSize",
        defaultRollTimerPoolSize);
    String kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal");
    String kerbKeytab = context.getString("hdfs.kerberosKeytab");
    String proxyUser = context.getString("hdfs.proxyUser");
    tryCount = context.getInteger("hdfs.closeTries", defaultTryCount);
    if (tryCount <= 0) {
        LOG.warn("Retry count value : " + tryCount + " is not " +
            "valid. The sink will try to close the file until the file " +
            "is eventually closed.");
        tryCount = defaultTryCount;
    }
    retryInterval = context.getLong("hdfs.retryInterval", defaultRetryInterval);
    if (retryInterval <= 0) {
        LOG.warn("Retry Interval value: " + retryInterval + " is not " +
            "valid. If the first close of a file fails, " +
            "it may remain open and will not be renamed.");
        tryCount = 1;
    }

    Preconditions.checkArgument(batchSize > 0, "batchSize must be greater than 0");
    if (codecName == null) {
        codeC = null;
        compType = CompressionType.NONE;
    } else {
        codeC = getCodec(codecName);
        // TODO : set proper compression type
        compType = CompressionType.BLOCK;
    }

    // Do not allow user to set fileType DataStream with codeC together
    // To prevent output file with compress extension (like .snappy)
    if (fileType.equalsIgnoreCase(HDFSWriterFactory.DataStreamType) && codecName != null) {
        throw new IllegalArgumentException("fileType: " + fileType +
            " which does NOT support compressed output. Please don't set codeC" +
            " or change the fileType if compressed output is desired.");
    }

    if (fileType.equalsIgnoreCase(HDFSWriterFactory.CompStreamType)) {
        Preconditions.checkNotNull(codeC, "It's essential to set compress codec"
            + " when fileType is: " + fileType);
    }

    // get the appropriate executor
    this.privExecutor = FlumeAuthenticationUtil.getAuthenticator(
            kerbConfPrincipal, kerbKeytab).proxyAs(proxyUser);

    needRounding = context.getBoolean("hdfs.round", false);

    if (needRounding) {
        String unit = context.getString("hdfs.roundUnit", "second");
        if (unit.equalsIgnoreCase("hour")) {
            this.roundUnit = Calendar.HOUR_OF_DAY;
        } else if (unit.equalsIgnoreCase("minute")) {
            this.roundUnit = Calendar.MINUTE;
        } else if (unit.equalsIgnoreCase("second")) {
            this.roundUnit = Calendar.SECOND;
        } else {
            LOG.warn("Rounding unit is not valid, please set one of" +
                "minute, hour, or second. Rounding will be disabled");
            eedRounding = false;
        }
        this.roundValue = context.getInteger("hdfs.roundValue", 1);
        if (roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE) {
        Preconditions.checkArgument(roundValue > 0 && roundValue <= 60,
            "Round value" +
            "must be > 0 and <= 60");
        } else if (roundUnit == Calendar.HOUR_OF_DAY) {
        Preconditions.checkArgument(roundValue > 0 && roundValue <= 24,
            "Round value" +
            "must be > 0 and <= 24");
        }
    }

    this.useLocalTime = context.getBoolean("hdfs.useLocalTimeStamp", false);
    if (useLocalTime) {
        clock = new SystemClock();
    }

    if (sinkCounter == null) {
        sinkCounter = new SinkCounter(getName());
    }
}